You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/02/20 14:54:07 UTC

[arrow] branch master updated: ARROW-694: [C++] Initial parser interface for reading JSON into RecordBatches

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c19bb6  ARROW-694: [C++] Initial parser interface for reading JSON into RecordBatches
9c19bb6 is described below

commit 9c19bb65c128f9e9a82e19b0c0caf575817e8fb2
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Wed Feb 20 08:53:57 2019 -0600

    ARROW-694: [C++] Initial parser interface for reading JSON into RecordBatches
    
    ( abandoning https://github.com/apache/arrow/pull/3206 )
    
    Adds [`json` sub project](https://github.com/apache/arrow/pull/3592/files#diff-2443c7d7b39b992ea580f0fbd387284a) with:
    
    - BlockParser which parses Buffers of json formatted data into a StructArray with minimal conversion
      * true/false, and null fields are stored in BooleanArray and NullArray respectively
      * strings are stored as indices into a single StringArray
      * numbers are not converted; their string representations are stored alongside string values
      * nested fields are stored as ListArray or StructArray of their parsed (unconverted) children
    - Three approaches to handling unexpected fields:
      1. Error on an unexpected field
      2. Ignore unexpected fields
      3. Infer the type of unexpected fields and add them to the schema
    - [Convenience interface](https://github.com/apache/arrow/pull/3592/files#diff-d043a0249cc485b08d93767d2075bd83R124) for parsing a single chunk of json data into a RecordBatch with fully converted columns
    - Chunker to process a stream of unchunked data for use by BlockParser (not currently used)
    
    Author: Benjamin Kietzman <be...@gmail.com>
    Author: Wes McKinney <we...@apache.org>
    
    Closes #3592 from bkietz/ARROW-694-json-reader-WIP and squashes the following commits:
    
    e42e5d730 <Wes McKinney> Add arrow_dependencies to arrow_flight.so dependencies to fix race condition with Flatbuffers
    d67aff63e <Benjamin Kietzman> adding more comments to parser.cc
    554b595d9 <Benjamin Kietzman> adding explanatory comments to json/parser.cc
    0d0caa991 <Benjamin Kietzman> Add ARROW_PREDICT_* to conditions in parser.cc
    d7d0a2eb6 <Benjamin Kietzman> fix doc error in chunker.h
    29c23128a <Wes McKinney> Disable arrow-json-chunker-test
    76d0431e2 <Wes McKinney> cmake-format
    83150ec13 <Wes McKinney> Restore BinaryBuilder::UnsafeAppend for const std::string&
    05fd9d189 <Benjamin Kietzman> add json project back (merge error)
    69541ea87 <Benjamin Kietzman> correct test util includes
    be720c812 <Benjamin Kietzman> Use compound statements in if()
    baac46735 <Benjamin Kietzman> use const shared_ptr<T>& instead of move
    6e86078f1 <Benjamin Kietzman> Move ParseOne to reader.cc
    1bebda861 <Benjamin Kietzman> clean up Chunker's stream usage
    0d6d92026 <Benjamin Kietzman> disabling chunker test
    c1a7f4bd3 <Benjamin Kietzman> check status for generating list elements
    292672aee <Benjamin Kietzman> add inline tag
    f92f8508c <Benjamin Kietzman> add Status return to Generate
    e3346485e <Benjamin Kietzman> remove misplaced const
    9679b6f76 <Benjamin Kietzman> fix format issue, use SFINAE to detect StringConverter default constructibility
    aaaf9e7e8 <Benjamin Kietzman> remove bitfields
    10e4f0dc4 <Benjamin Kietzman> add missing virtual destructor
    92ffc640d <Benjamin Kietzman> adding ParseOne interface for dead simple parsing
    330615b95 <Benjamin Kietzman> adding first draft of parsing with type inference
    69d7c5c00 <Benjamin Kietzman> Rewrite parser to defer conversion of strings and numbers
    b9d5c3d2d <Benjamin Kietzman> adding Chunker implementation and tests
    2677a575d <Benjamin Kietzman> use recommended loop style
    e39a4a9e0 <Benjamin Kietzman> add (failing) test for '-0' consistency
    0f3a3bc0f <Benjamin Kietzman> Added trivial parser benchmark and data generator
    6472738b3 <Benjamin Kietzman> Refactored type inferrence
    cb6a313d7 <Benjamin Kietzman> adding first draft of type inferrence to BlockParser
    cc3698a44 <Benjamin Kietzman> refactoring Schema::GetFieldIndex to return int
    17176f975 <Benjamin Kietzman> first sketch of JSON parser
---
 cpp/src/arrow/CMakeLists.txt           |    5 +
 cpp/src/arrow/array.cc                 |    6 +-
 cpp/src/arrow/array.h                  |    2 +
 cpp/src/arrow/array/builder_binary.h   |    4 +
 cpp/src/arrow/flight/CMakeLists.txt    |    1 +
 cpp/src/arrow/json/CMakeLists.txt      |   25 +
 cpp/src/arrow/json/api.h               |   24 +
 cpp/src/arrow/json/chunker-test.cc     |  207 +++++++
 cpp/src/arrow/json/chunker.cc          |  212 +++++++
 cpp/src/arrow/json/chunker.h           |   64 ++
 cpp/src/arrow/json/options.cc          |   28 +
 cpp/src/arrow/json/options.h           |   68 +++
 cpp/src/arrow/json/parser-benchmark.cc |   70 +++
 cpp/src/arrow/json/parser-test.cc      |  249 ++++++++
 cpp/src/arrow/json/parser.cc           | 1034 ++++++++++++++++++++++++++++++++
 cpp/src/arrow/json/parser.h            |  128 ++++
 cpp/src/arrow/json/reader.cc           |  298 +++++++++
 cpp/src/arrow/json/reader.h            |   60 ++
 cpp/src/arrow/json/test-common.h       |  149 +++++
 cpp/src/arrow/record_batch.h           |    8 +
 cpp/src/arrow/table.h                  |    8 +
 cpp/src/arrow/type.cc                  |    4 +-
 cpp/src/arrow/type.h                   |    2 +-
 cpp/src/arrow/util/stl.h               |   17 +
 24 files changed, 2669 insertions(+), 4 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 07c404a..39c342b 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -104,6 +104,10 @@ set(ARROW_SRCS
     csv/options.cc
     csv/parser.cc
     csv/reader.cc
+    json/options.cc
+    json/chunker.cc
+    json/parser.cc
+    json/reader.cc
     io/buffered.cc
     io/compressed.cc
     io/file.cc
@@ -321,6 +325,7 @@ add_arrow_benchmark(column-benchmark)
 
 add_subdirectory(array)
 add_subdirectory(csv)
+add_subdirectory(json)
 add_subdirectory(io)
 add_subdirectory(util)
 add_subdirectory(vendored)
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 8ceb6ea..563ac25 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -266,8 +266,12 @@ void ListArray::SetData(const std::shared_ptr<ArrayData>& data) {
   values_ = MakeArray(data_->child_data[0]);
 }
 
+const ListType* ListArray::list_type() const {
+  return checked_cast<const ListType*>(data_->type.get());
+}
+
 std::shared_ptr<DataType> ListArray::value_type() const {
-  return checked_cast<const ListType&>(*type()).value_type();
+  return list_type()->value_type();
 }
 
 std::shared_ptr<Array> ListArray::values() const { return values_; }
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index f8d451a..35c7bd8 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -486,6 +486,8 @@ class ARROW_EXPORT ListArray : public Array {
   static Status FromArrays(const Array& offsets, const Array& values, MemoryPool* pool,
                            std::shared_ptr<Array>* out);
 
+  const ListType* list_type() const;
+
   /// \brief Return array object containing the list's values
   std::shared_ptr<Array> values() const;
 
diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h
index 67d579d..3bc930c 100644
--- a/cpp/src/arrow/array/builder_binary.h
+++ b/cpp/src/arrow/array/builder_binary.h
@@ -88,6 +88,10 @@ class ARROW_EXPORT BinaryBuilder : public ArrayBuilder {
     UnsafeAppend(value.c_str(), static_cast<int32_t>(value.size()));
   }
 
+  void UnsafeAppend(util::string_view value) {
+    UnsafeAppend(value.data(), static_cast<int32_t>(value.size()));
+  }
+
   void UnsafeAppendNull() {
     const int64_t num_bytes = value_data_builder_.length();
     offsets_builder_.UnsafeAppend(static_cast<int32_t>(num_bytes));
diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt
index f02bc21..6f44c01 100644
--- a/cpp/src/arrow/flight/CMakeLists.txt
+++ b/cpp/src/arrow/flight/CMakeLists.txt
@@ -92,6 +92,7 @@ if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
                 DEPENDENCIES
                 ${GTEST_LIBRARY}
                 flight_grpc_gen
+                arrow_dependencies
                 SHARED_LINK_LIBS
                 arrow_shared
                 arrow_flight_shared
diff --git a/cpp/src/arrow/json/CMakeLists.txt b/cpp/src/arrow/json/CMakeLists.txt
new file mode 100644
index 0000000..a244b8c
--- /dev/null
+++ b/cpp/src/arrow/json/CMakeLists.txt
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_arrow_test(parser-test PREFIX "arrow-json")
+
+# TODO(wesm): ARROW-694 this fails valgrind
+# add_arrow_test(chunker-test PREFIX "arrow-json")
+
+add_arrow_benchmark(parser-benchmark PREFIX "arrow-json")
+
+arrow_install_all_headers("arrow/json")
diff --git a/cpp/src/arrow/json/api.h b/cpp/src/arrow/json/api.h
new file mode 100644
index 0000000..00fbc2e
--- /dev/null
+++ b/cpp/src/arrow/json/api.h
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_API_H
+#define ARROW_JSON_API_H
+
+#include "arrow/json/options.h"
+#include "arrow/json/reader.h"
+
+#endif  // ARROW_JSON_API_H
diff --git a/cpp/src/arrow/json/chunker-test.cc b/cpp/src/arrow/json/chunker-test.cc
new file mode 100644
index 0000000..36e2953
--- /dev/null
+++ b/cpp/src/arrow/json/chunker-test.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <numeric>
+#include <string>
+
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/reader.h>
+
+#include "arrow/json/chunker.h"
+#include "arrow/json/options.h"
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+namespace json {
+
+// Use no nested objects and no string literals containing braces in this test.
+// This way the positions of '{' and '}' can be used as simple proxies
+// for object begin/end.
+
+using util::string_view;
+
+template <typename Lines>
+std::string join(Lines&& lines, std::string delimiter) {
+  std::string joined;
+  for (const auto& line : lines) {
+    joined += line + delimiter;
+  }
+  return joined;
+}
+
+std::string PrettyPrint(std::string one_line) {
+  rapidjson::Document document;
+  document.ParseInsitu(const_cast<char*>(one_line.data()));
+  rapidjson::StringBuffer sb;
+  rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb);
+  document.Accept(writer);
+  return sb.GetString();
+}
+
+bool WhitespaceOnly(string_view s) {
+  return s.find_first_not_of(" \t\r\n") == string_view::npos;
+}
+
+std::size_t ConsumeWholeObject(string_view* str) {
+  auto fail = [str] {
+    *str = string_view();
+    return string_view::npos;
+  };
+  if (WhitespaceOnly(*str)) return fail();
+  auto open_brace = str->find_first_not_of(" \t\r\n");
+  if (str->at(open_brace) != '{') return fail();
+  auto close_brace = str->find_first_of("}");
+  if (close_brace == string_view::npos) return fail();
+  if (str->at(close_brace) != '}') return fail();
+  auto length = close_brace + 1;
+  *str = str->substr(length);
+  return length;
+}
+
+void AssertWholeObjects(Chunker& chunker, string_view block, int expected_count) {
+  string_view whole;
+  ASSERT_OK(chunker.Process(block, &whole));
+  int count = 0;
+  while (!WhitespaceOnly(whole)) {
+    if (ConsumeWholeObject(&whole) == string_view::npos) FAIL();
+    ++count;
+  }
+  ASSERT_EQ(count, expected_count);
+}
+
+void AssertChunking(Chunker& chunker, std::string str, int total_count) {
+  // First chunkize whole JSON block
+  AssertWholeObjects(chunker, str, total_count);
+
+  // Then chunkize incomplete substrings of the block
+  for (int i = 0; i != total_count; ++i) {
+    // ensure shearing the closing brace off the last object causes it to be chunked out
+    string_view str_view(str);
+    auto last_brace = str_view.find_last_of('}');
+    AssertWholeObjects(chunker, str.substr(0, last_brace), total_count - i - 1);
+
+    // ensure skipping one object reduces the count by one
+    ASSERT_NE(ConsumeWholeObject(&str_view), string_view::npos);
+    str = str_view.to_string();
+    AssertWholeObjects(chunker, str, total_count - i - 1);
+  }
+}
+
+void AssertStraddledChunking(Chunker& chunker, string_view str) {
+  auto first_half = str.substr(0, str.size() / 2).to_string();
+  auto second_half = str.substr(str.size() / 2);
+  AssertChunking(chunker, first_half, 1);
+  string_view first_whole;
+  ASSERT_OK(chunker.Process(first_half, &first_whole));
+  ASSERT_TRUE(string_view(first_half).starts_with(first_whole));
+  auto partial = string_view(first_half).substr(first_whole.size());
+  string_view completion;
+  ASSERT_OK(chunker.Process(partial, second_half, &completion));
+  ASSERT_TRUE(second_half.starts_with(completion));
+  auto straddling = partial.to_string() + completion.to_string();
+  string_view straddling_view(straddling);
+  auto length = ConsumeWholeObject(&straddling_view);
+  ASSERT_NE(length, string_view::npos);
+  ASSERT_NE(length, 0);
+  auto final_whole = second_half.substr(completion.size());
+  length = ConsumeWholeObject(&final_whole);
+  ASSERT_NE(length, string_view::npos);
+  ASSERT_NE(length, 0);
+}
+
+std::unique_ptr<Chunker> MakeChunker(bool newlines_in_values) {
+  auto options = ParseOptions::Defaults();
+  options.newlines_in_values = newlines_in_values;
+  return Chunker::Make(options);
+}
+
+class BaseChunkerTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { chunker_ = MakeChunker(GetParam()); }
+
+  std::unique_ptr<Chunker> chunker_;
+};
+
+INSTANTIATE_TEST_CASE_P(ChunkerTest, BaseChunkerTest, ::testing::Values(true));
+
+INSTANTIATE_TEST_CASE_P(NoNewlineChunkerTest, BaseChunkerTest, ::testing::Values(false));
+
+constexpr auto object_count = 3;
+const std::vector<std::string>& lines() {
+  static const std::vector<std::string> l = {R"({"0":"ab","1":"c","2":""})",
+                                             R"({"0":"def","1":"","2":"gh"})",
+                                             R"({"0":"","1":"ij","2":"kl"})"};
+  return l;
+}
+
+TEST_P(BaseChunkerTest, Basics) {
+  AssertChunking(*chunker_, join(lines(), "\n"), object_count);
+}
+
+TEST_P(BaseChunkerTest, Empty) {
+  AssertChunking(*chunker_, "\n", 0);
+  AssertChunking(*chunker_, "\n\n", 0);
+}
+
+TEST(ChunkerTest, PrettyPrinted) {
+  std::string pretty[object_count];
+  std::transform(std::begin(lines()), std::end(lines()), std::begin(pretty), PrettyPrint);
+  auto chunker = MakeChunker(true);
+  AssertChunking(*chunker, join(pretty, "\n"), object_count);
+}
+
+TEST(ChunkerTest, SingleLine) {
+  auto chunker = MakeChunker(true);
+  AssertChunking(*chunker, join(lines(), ""), object_count);
+}
+
+TEST_P(BaseChunkerTest, Straddling) {
+  AssertStraddledChunking(*chunker_, join(lines(), "\n"));
+}
+
+TEST(ChunkerTest, StraddlingPrettyPrinted) {
+  std::string pretty[object_count];
+  std::transform(std::begin(lines()), std::end(lines()), std::begin(pretty), PrettyPrint);
+  auto chunker = MakeChunker(true);
+  AssertStraddledChunking(*chunker, join(pretty, "\n"));
+}
+
+TEST(ChunkerTest, StraddlingSingleLine) {
+  auto chunker = MakeChunker(true);
+  AssertStraddledChunking(*chunker, join(lines(), ""));
+}
+
+TEST_P(BaseChunkerTest, StraddlingEmpty) {
+  auto joined = join(lines(), "\n");
+  auto first = string_view(joined).substr(0, lines()[0].size() + 1);
+  auto rest = string_view(joined).substr(first.size());
+  string_view first_whole;
+  ASSERT_OK(chunker_->Process(first, &first_whole));
+  auto partial = first.substr(first_whole.size());
+  string_view completion;
+  ASSERT_OK(chunker_->Process(partial, rest, &completion));
+  ASSERT_EQ(completion.size(), 0);
+}
+
+}  // namespace json
+}  // namespace arrow
diff --git a/cpp/src/arrow/json/chunker.cc b/cpp/src/arrow/json/chunker.cc
new file mode 100644
index 0000000..02daa06
--- /dev/null
+++ b/cpp/src/arrow/json/chunker.cc
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/chunker.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#if defined(ARROW_HAVE_SSE4_2)
+#define RAPIDJSON_SSE42 1
+#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
+#endif
+#if defined(ARROW_HAVE_SSE2)
+#define RAPIDJSON_SSE2 1
+#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
+#endif
+#include <rapidjson/error/en.h>
+#include <rapidjson/reader.h>
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/stl.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+namespace json {
+
+using internal::make_unique;
+using util::string_view;
+
+Status StraddlingTooLarge() {
+  return Status::Invalid("straddling object straddles two block boundaries");
+}
+
+std::size_t ConsumeWhitespace(string_view* str) {
+#if defined(ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD)
+  auto nonws_begin =
+      rapidjson::SkipWhitespace_SIMD(str->data(), str->data() + str->size());
+  auto ws_count = nonws_begin - str->data();
+  *str = str->substr(ws_count);
+  return static_cast<std::size_t>(ws_count);
+#undef ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD
+#else
+  auto ws_count = str->find_first_not_of(" \t\r\n");
+  *str = str->substr(ws_count);
+  return ws_count;
+#endif
+}
+
+class NewlinesStrictlyDelimitChunker : public Chunker {
+ public:
+  Status Process(string_view block, string_view* chunked) override {
+    auto last_newline = block.find_last_of("\n\r");
+    if (last_newline == string_view::npos) {
+      // no newlines in this block, return empty chunk
+      *chunked = string_view();
+    } else {
+      *chunked = block.substr(0, last_newline + 1);
+    }
+    return Status::OK();
+  }
+
+  Status Process(string_view partial, string_view block,
+                 string_view* completion) override {
+    ConsumeWhitespace(&partial);
+    if (partial.size() == 0) {
+      // if partial is empty, don't bother looking for completion
+      *completion = string_view();
+      return Status::OK();
+    }
+    auto first_newline = block.find_first_of("\n\r");
+    if (first_newline == string_view::npos) {
+      // no newlines in this block; straddling object straddles *two* block boundaries.
+      // retry with larger buffer
+      return StraddlingTooLarge();
+    }
+    *completion = block.substr(0, first_newline + 1);
+    return Status::OK();
+  }
+};
+
+/// RapidJson custom stream for reading JSON stored in multiple buffers
+/// http://rapidjson.org/md_doc_stream.html#CustomStream
+class MultiStringStream {
+ public:
+  using Ch = char;
+  explicit MultiStringStream(std::vector<string_view> strings)
+      : strings_(std::move(strings)) {
+    std::remove(strings_.begin(), strings_.end(), string_view(""));
+    std::reverse(strings_.begin(), strings_.end());
+  }
+  char Peek() const {
+    if (strings_.size() == 0) return '\0';
+    return strings_.back()[0];
+  }
+  char Take() {
+    if (strings_.size() == 0) return '\0';
+    char taken = strings_.back()[0];
+    if (strings_.back().size() == 1) {
+      strings_.pop_back();
+    } else {
+      strings_.back() = strings_.back().substr(1);
+    }
+    ++index_;
+    return taken;
+  }
+  std::size_t Tell() { return index_; }
+  void Put(char) { ARROW_LOG(FATAL) << "not implemented"; }
+  void Flush() { ARROW_LOG(FATAL) << "not implemented"; }
+  char* PutBegin() {
+    ARROW_LOG(FATAL) << "not implemented";
+    return nullptr;
+  }
+  std::size_t PutEnd(char*) {
+    ARROW_LOG(FATAL) << "not implemented";
+    return 0;
+  }
+
+ private:
+  std::size_t index_ = 0;
+  std::vector<string_view> strings_;
+};
+
+template <typename Stream>
+std::size_t ConsumeWholeObject(Stream&& stream) {
+  static constexpr unsigned parse_flags = rapidjson::kParseIterativeFlag |
+                                          rapidjson::kParseStopWhenDoneFlag |
+                                          rapidjson::kParseNumbersAsStringsFlag;
+  rapidjson::BaseReaderHandler<rapidjson::UTF8<>> handler;
+  rapidjson::Reader reader;
+  // parse a single JSON object
+  switch (reader.Parse<parse_flags>(stream, handler).Code()) {
+    case rapidjson::kParseErrorNone:
+      return stream.Tell();
+    case rapidjson::kParseErrorDocumentEmpty:
+      return 0;
+    default:
+      // rapidjson emitted an error, the most recent object was partial
+      return string_view::npos;
+  }
+}
+
+class ParsingChunker : public Chunker {
+ public:
+  Status Process(string_view block, string_view* chunked) override {
+    if (block.size() == 0) {
+      *chunked = string_view();
+      return Status::OK();
+    }
+    std::size_t total_length = 0;
+    for (auto consumed = block;; consumed = block.substr(total_length)) {
+      auto length = ConsumeWholeObject(rapidjson::StringStream(consumed.data()));
+      if (length == string_view::npos || length == 0) {
+        // found incomplete object or consumed is empty
+        break;
+      }
+      if (length > consumed.size()) {
+        total_length += consumed.size();
+        break;
+      }
+      total_length += length;
+    }
+    *chunked = block.substr(0, total_length);
+    return Status::OK();
+  }
+
+  Status Process(string_view partial, string_view block,
+                 string_view* completion) override {
+    ConsumeWhitespace(&partial);
+    if (partial.size() == 0) {
+      // if partial is empty, don't bother looking for completion
+      *completion = string_view();
+      return Status::OK();
+    }
+    auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
+    if (length == string_view::npos) {
+      // straddling object straddles *two* block boundaries.
+      // retry with larger buffer
+      return StraddlingTooLarge();
+    }
+    *completion = block.substr(0, length - partial.size());
+    return Status::OK();
+  }
+};
+
+std::unique_ptr<Chunker> Chunker::Make(ParseOptions options) {
+  if (!options.newlines_in_values) {
+    return make_unique<NewlinesStrictlyDelimitChunker>();
+  }
+  return make_unique<ParsingChunker>();
+}
+
+}  // namespace json
+}  // namespace arrow
diff --git a/cpp/src/arrow/json/chunker.h b/cpp/src/arrow/json/chunker.h
new file mode 100644
index 0000000..1e71dd4
--- /dev/null
+++ b/cpp/src/arrow/json/chunker.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_CHUNKER_H
+#define ARROW_JSON_CHUNKER_H
+
+#include <memory>
+
+#include "arrow/json/options.h"
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/sse-util.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace json {
+
+/// \class Chunker
+/// \brief A reusable block-based chunker for JSON data
+///
+/// The chunker takes a block of JSON data and finds a suitable place
+/// to cut it up without splitting an object.
+class ARROW_EXPORT Chunker {
+ public:
+  virtual ~Chunker() = default;
+
+  /// \brief Carve up a chunk in a block of data to contain only whole objects
+  /// \param[in] block json data to be chunked, must end with '\0'
+  /// \param[out] chunked subrange of block containing whole json objects
+  virtual Status Process(util::string_view block, util::string_view* chunked) = 0;
+
+  /// \brief Carve the completion of a partial object out of a block
+  /// \param[in] partial incomplete json object
+  /// \param[in] block json data
+  /// \param[out] completion subrange of block contining the completion of partial
+  virtual Status Process(util::string_view partial, util::string_view block,
+                         util::string_view* completion) = 0;
+
+  static std::unique_ptr<Chunker> Make(ParseOptions options);
+
+ protected:
+  Chunker() = default;
+  ARROW_DISALLOW_COPY_AND_ASSIGN(Chunker);
+};
+
+}  // namespace json
+}  // namespace arrow
+
+#endif  // ARROW_JSON_CHUNKER_H
diff --git a/cpp/src/arrow/json/options.cc b/cpp/src/arrow/json/options.cc
new file mode 100644
index 0000000..dc5e628
--- /dev/null
+++ b/cpp/src/arrow/json/options.cc
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/options.h"
+
+namespace arrow {
+namespace json {
+
+ParseOptions ParseOptions::Defaults() { return ParseOptions(); }
+
+ReadOptions ReadOptions::Defaults() { return ReadOptions(); }
+
+}  // namespace json
+}  // namespace arrow
diff --git a/cpp/src/arrow/json/options.h b/cpp/src/arrow/json/options.h
new file mode 100644
index 0000000..0ec1b29
--- /dev/null
+++ b/cpp/src/arrow/json/options.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_OPTIONS_H
+#define ARROW_JSON_OPTIONS_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "arrow/type.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class DataType;
+
+namespace json {
+
+enum class UnexpectedFieldBehavior : char { Ignore, Error, InferType };
+
+struct ARROW_EXPORT ParseOptions {
+  // Parsing options
+
+  // Optional explicit schema (no type inference, ignores other fields)
+  std::shared_ptr<Schema> explicit_schema;
+
+  // Whether objects may be printed across multiple lines (for example pretty printed)
+  // NB: if false, input must end with an empty line
+  bool newlines_in_values = false;
+
+  // How should parse handle fields outside the explicit_schema?
+  UnexpectedFieldBehavior unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+
+  static ParseOptions Defaults();
+};
+
+struct ARROW_EXPORT ReadOptions {
+  // Reader options
+
+  // Whether to use the global CPU thread pool
+  bool use_threads = true;
+  // Block size we request from the IO layer; also determines the size of
+  // chunks when use_threads is true
+  int32_t block_size = 1 << 20;  // 1 MB
+
+  static ReadOptions Defaults();
+};
+
+}  // namespace json
+}  // namespace arrow
+
+#endif  // ARROW_JSON_OPTIONS_H
diff --git a/cpp/src/arrow/json/parser-benchmark.cc b/cpp/src/arrow/json/parser-benchmark.cc
new file mode 100644
index 0000000..82f0b33
--- /dev/null
+++ b/cpp/src/arrow/json/parser-benchmark.cc
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <iostream>
+#include <string>
+
+#include "arrow/json/options.h"
+#include "arrow/json/parser.h"
+#include "arrow/json/test-common.h"
+#include "arrow/testing/gtest_util.h"
+
+namespace arrow {
+namespace json {
+
+static void BenchmarkJSONParsing(benchmark::State& state,  // NOLINT non-const reference
+                                 const std::string& json, int32_t num_rows,
+                                 ParseOptions options) {
+  for (auto _ : state) {
+    std::shared_ptr<Buffer> src;
+    ABORT_NOT_OK(MakeBuffer(json, &src));
+    BlockParser parser(options, src);
+    ABORT_NOT_OK(parser.Parse(src));
+    if (parser.num_rows() != num_rows) {
+      std::cerr << "Parsing incomplete\n";
+      std::abort();
+    }
+    std::shared_ptr<Array> parsed;
+    ABORT_NOT_OK(parser.Finish(&parsed));
+  }
+  state.SetBytesProcessed(state.iterations() * json.size());
+}
+
+static void BM_ParseJSONBlockWithSchema(
+    benchmark::State& state) {  // NOLINT non-const reference
+  const int32_t num_rows = 5000;
+  auto options = ParseOptions::Defaults();
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  options.explicit_schema = schema({field("int", int32()), field("str", utf8())});
+  std::mt19937_64 engine;
+  std::string json;
+  for (int i = 0; i != num_rows; ++i) {
+    StringBuffer sb;
+    Writer writer(sb);
+    ABORT_NOT_OK(Generate(options.explicit_schema, engine, &writer));
+    json += sb.GetString();
+    json += "\n";
+  }
+  BenchmarkJSONParsing(state, json, num_rows, options);
+}
+
+BENCHMARK(BM_ParseJSONBlockWithSchema)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
+}  // namespace json
+}  // namespace arrow
diff --git a/cpp/src/arrow/json/parser-test.cc b/cpp/src/arrow/json/parser-test.cc
new file mode 100644
index 0000000..c3af4a7
--- /dev/null
+++ b/cpp/src/arrow/json/parser-test.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <iomanip>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/reader.h>
+
+#include "arrow/ipc/json-simple.h"
+#include "arrow/json/options.h"
+#include "arrow/json/parser.h"
+#include "arrow/json/reader.h"
+#include "arrow/json/test-common.h"
+#include "arrow/status.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+
+using util::string_view;
+
+namespace json {
+
+std::string scalars_only_src() {
+  return R"(
+    { "hello": 3.5, "world": false, "yo": "thing" }
+    { "hello": 3.2, "world": null }
+    { "hello": 3.4, "world": null, "yo": "\u5fcd" }
+    { "hello": 0.0, "world": true, "yo": null }
+  )";
+}
+
+std::string nested_src() {
+  return R"(
+    { "hello": 3.5, "world": false, "yo": "thing", "arr": [1, 2, 3], "nuf": {} }
+    { "hello": 3.2, "world": null, "arr": [2], "nuf": null }
+    { "hello": 3.4, "world": null, "yo": "\u5fcd", "arr": [], "nuf": { "ps": 78 } }
+    { "hello": 0.0, "world": true, "yo": null, "arr": null, "nuf": { "ps": 90 } }
+  )";
+}
+
+void AssertRawStructArraysEqual(const StructArray& expected, const StructArray& actual);
+
+void AssertRawArraysEqual(const Array& expected, const Array& actual) {
+  switch (actual.type_id()) {
+    case Type::BOOL:
+    case Type::NA:
+      return AssertArraysEqual(expected, actual);
+    case Type::DICTIONARY: {
+      ASSERT_EQ(expected.type_id(), Type::STRING);
+      std::shared_ptr<Array> actual_decoded;
+      ASSERT_OK(DecodeStringDictionary(static_cast<const DictionaryArray&>(actual),
+                                       &actual_decoded));
+      return AssertArraysEqual(expected, *actual_decoded);
+    }
+    case Type::LIST: {
+      ASSERT_EQ(expected.type_id(), Type::LIST);
+      AssertBufferEqual(*expected.null_bitmap(), *actual.null_bitmap());
+      const auto& expected_offsets = expected.data()->buffers[1];
+      const auto& actual_offsets = actual.data()->buffers[1];
+      AssertBufferEqual(*expected_offsets, *actual_offsets);
+      auto expected_values = static_cast<const ListArray&>(expected).values();
+      auto actual_values = static_cast<const ListArray&>(actual).values();
+      return AssertRawArraysEqual(*expected_values, *actual_values);
+    }
+    case Type::STRUCT:
+      ASSERT_EQ(expected.type_id(), Type::STRUCT);
+      return AssertRawStructArraysEqual(static_cast<const StructArray&>(expected),
+                                        static_cast<const StructArray&>(actual));
+    default:
+      FAIL();
+  }
+}
+
+void AssertRawStructArraysEqual(const StructArray& expected, const StructArray& actual) {
+  ASSERT_EQ(expected.num_fields(), actual.num_fields());
+  for (int i = 0; i != expected.num_fields(); ++i) {
+    auto expected_name = expected.type()->child(i)->name();
+    auto actual_name = actual.type()->child(i)->name();
+    ASSERT_EQ(expected_name, actual_name);
+    AssertRawArraysEqual(*expected.field(i), *actual.field(i));
+  }
+}
+
+void AssertParseColumns(ParseOptions options, string_view src_str,
+                        const std::vector<std::shared_ptr<Field>>& fields,
+                        const std::vector<std::string>& columns_json) {
+  std::shared_ptr<Buffer> src;
+  ASSERT_OK(MakeBuffer(src_str, &src));
+  BlockParser parser(options, src);
+  ASSERT_OK(parser.Parse(src));
+  std::shared_ptr<Array> parsed;
+  ASSERT_OK(parser.Finish(&parsed));
+  auto struct_array = std::static_pointer_cast<StructArray>(parsed);
+  for (size_t i = 0; i != fields.size(); ++i) {
+    // std::shared_ptr<Array> column_expected;
+    // ASSERT_OK(ArrayFromJSON(fields[i]->type(), columns_json[i], &column_expected));
+    auto column_expected = ArrayFromJSON(fields[i]->type(), columns_json[i]);
+    auto column = struct_array->GetFieldByName(fields[i]->name());
+    AssertRawArraysEqual(*column_expected, *column);
+  }
+}
+
+// TODO(bkietz) parameterize (at least some of) these tests over UnexpectedFieldBehavior
+
+TEST(BlockParserWithSchema, Basics) {
+  auto options = ParseOptions::Defaults();
+  options.explicit_schema =
+      schema({field("hello", float64()), field("world", boolean()), field("yo", utf8())});
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  AssertParseColumns(
+      options, scalars_only_src(),
+      {field("hello", utf8()), field("world", boolean()), field("yo", utf8())},
+      {"[\"3.5\", \"3.2\", \"3.4\", \"0.0\"]", "[false, null, null, true]",
+       "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
+}
+
+TEST(BlockParserWithSchema, Empty) {
+  auto options = ParseOptions::Defaults();
+  options.explicit_schema =
+      schema({field("hello", float64()), field("world", boolean()), field("yo", utf8())});
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  AssertParseColumns(
+      options, "",
+      {field("hello", utf8()), field("world", boolean()), field("yo", utf8())},
+      {"[]", "[]", "[]"});
+}
+
+TEST(BlockParserWithSchema, SkipFieldsOutsideSchema) {
+  auto options = ParseOptions::Defaults();
+  options.explicit_schema = schema({field("hello", float64()), field("yo", utf8())});
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  AssertParseColumns(options, scalars_only_src(),
+                     {field("hello", utf8()), field("yo", utf8())},
+                     {"[\"3.5\", \"3.2\", \"3.4\", \"0.0\"]",
+                      "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
+}
+
+TEST(BlockParserWithSchema, FailOnInconvertible) {
+  auto options = ParseOptions::Defaults();
+  options.explicit_schema = schema({field("a", int32())});
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  std::shared_ptr<Buffer> src;
+  ASSERT_OK(MakeBuffer("{\"a\":0}\n{\"a\":true}", &src));
+  BlockParser parser(options, src);
+  ASSERT_RAISES(Invalid, parser.Parse(src));
+}
+
+TEST(BlockParserWithSchema, Nested) {
+  auto options = ParseOptions::Defaults();
+  options.explicit_schema = schema({field("yo", utf8()), field("arr", list(int32())),
+                                    field("nuf", struct_({field("ps", int32())}))});
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  AssertParseColumns(options, nested_src(),
+                     {field("yo", utf8()), field("arr", list(utf8())),
+                      field("nuf", struct_({field("ps", utf8())}))},
+                     {"[\"thing\", null, \"\xe5\xbf\x8d\", null]",
+                      R"([["1", "2", "3"], ["2"], [], null])",
+                      R"([{"ps":null}, null, {"ps":"78"}, {"ps":"90"}])"});
+}
+
+TEST(BlockParserWithSchema, FailOnIncompleteJson) {
+  auto options = ParseOptions::Defaults();
+  options.explicit_schema = schema({field("a", int32())});
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  std::shared_ptr<Buffer> src;
+  ASSERT_OK(MakeBuffer("{\"a\":0, \"b\"", &src));
+  BlockParser parser(options, src);
+  ASSERT_RAISES(Invalid, parser.Parse(src));
+}
+
+TEST(BlockParser, Basics) {
+  auto options = ParseOptions::Defaults();
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+  AssertParseColumns(
+      options, scalars_only_src(),
+      {field("hello", utf8()), field("world", boolean()), field("yo", utf8())},
+      {"[\"3.5\", \"3.2\", \"3.4\", \"0.0\"]", "[false, null, null, true]",
+       "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
+}
+
+TEST(BlockParser, Nested) {
+  auto options = ParseOptions::Defaults();
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+  AssertParseColumns(options, nested_src(),
+                     {field("yo", utf8()), field("arr", list(utf8())),
+                      field("nuf", struct_({field("ps", utf8())}))},
+                     {"[\"thing\", null, \"\xe5\xbf\x8d\", null]",
+                      R"([["1", "2", "3"], ["2"], [], null])",
+                      R"([{"ps":null}, null, {"ps":"78"}, {"ps":"90"}])"});
+}
+
+void AssertParseOne(ParseOptions options, string_view src_str,
+                    const std::vector<std::shared_ptr<Field>>& fields,
+                    const std::vector<std::string>& columns_json) {
+  std::shared_ptr<Buffer> src;
+  ASSERT_OK(MakeBuffer(src_str, &src));
+  std::shared_ptr<RecordBatch> parsed;
+  ASSERT_OK(ParseOne(options, src, &parsed));
+  for (size_t i = 0; i != fields.size(); ++i) {
+    auto column_expected = ArrayFromJSON(fields[i]->type(), columns_json[i]);
+    auto column = parsed->GetColumnByName(fields[i]->name());
+    AssertArraysEqual(*column_expected, *column);
+  }
+}
+
+TEST(ParseOne, Basics) {
+  auto options = ParseOptions::Defaults();
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+  AssertParseOne(
+      options, scalars_only_src(),
+      {field("hello", float64()), field("world", boolean()), field("yo", utf8())},
+      {"[3.5, 3.2, 3.4, 0.0]", "[false, null, null, true]",
+       "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
+}
+
+TEST(ParseOne, Nested) {
+  auto options = ParseOptions::Defaults();
+  options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+  AssertParseOne(
+      options, nested_src(),
+      {field("yo", utf8()), field("arr", list(int64())),
+       field("nuf", struct_({field("ps", int64())}))},
+      {"[\"thing\", null, \"\xe5\xbf\x8d\", null]", R"([[1, 2, 3], [2], [], null])",
+       R"([{"ps":null}, null, {"ps":78}, {"ps":90}])"});
+}
+
+}  // namespace json
+}  // namespace arrow
diff --git a/cpp/src/arrow/json/parser.cc b/cpp/src/arrow/json/parser.cc
new file mode 100644
index 0000000..6182495
--- /dev/null
+++ b/cpp/src/arrow/json/parser.cc
@@ -0,0 +1,1034 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/parser.h"
+
+#include <algorithm>
+#include <cstdio>
+#include <limits>
+#include <sstream>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include <rapidjson/error/en.h>
+#include <rapidjson/reader.h>
+
+#include "arrow/array.h"
+#include "arrow/buffer-builder.h"
+#include "arrow/builder.h"
+#include "arrow/csv/converter.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/decimal.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/stl.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+namespace json {
+
+using internal::checked_cast;
+using util::string_view;
+
+template <typename... T>
+Status ParseError(T&&... t) {
+  return Status::Invalid("JSON parse error: ", std::forward<T>(t)...);
+}
+
+Status KindChangeError(Kind::type from, Kind::type to) {
+  auto from_name = Tag(from)->value(0);
+  auto to_name = Tag(to)->value(0);
+  return ParseError("A column changed from ", from_name, " to ", to_name);
+}
+
+/// Similar to StringBuilder, but appends bytes into the provided buffer without
+/// resizing. This builder does not support appending nulls.
+class UnsafeStringBuilder {
+ public:
+  UnsafeStringBuilder(MemoryPool* pool, const std::shared_ptr<Buffer>& buffer)
+      : offsets_builder_(pool), values_buffer_(buffer) {
+    DCHECK_NE(values_buffer_, nullptr);
+  }
+
+  Status Append(string_view str) {
+    DCHECK_LE(static_cast<int64_t>(str.size()), capacity() - values_end_);
+    RETURN_NOT_OK(AppendNextOffset());
+    std::memcpy(values_buffer_->mutable_data() + values_end_, str.data(), str.size());
+    length_ += 1;
+    values_end_ += str.size();
+    return Status::OK();
+  }
+
+  // Builder may not be reused after Finish()
+  Status Finish(std::shared_ptr<Array>* out, int64_t* values_length = nullptr) && {
+    RETURN_NOT_OK(AppendNextOffset());
+    if (values_length) {
+      *values_length = values_end_;
+    }
+    std::shared_ptr<Buffer> offsets;
+    RETURN_NOT_OK(offsets_builder_.Finish(&offsets));
+    auto data = ArrayData::Make(utf8(), length_, {nullptr, offsets, values_buffer_}, 0);
+    *out = MakeArray(data);
+    return Status::OK();
+  }
+
+  int64_t length() { return length_; }
+
+  int64_t capacity() { return values_buffer_->size(); }
+
+  int64_t remaining_capacity() { return values_buffer_->size() - values_end_; }
+
+ private:
+  Status AppendNextOffset() {
+    return offsets_builder_.Append(static_cast<int32_t>(values_end_));
+  }
+
+  int64_t length_ = 0;
+  int64_t values_end_ = 0;
+  TypedBufferBuilder<int32_t> offsets_builder_;
+  std::shared_ptr<Buffer> values_buffer_;
+};
+
+/// Store a stack of bitsets efficiently. The top bitset may be accessed and its bits may
+/// be modified, but it may not be resized.
+class BitsetStack {
+ public:
+  using reference = typename std::vector<bool>::reference;
+
+  void Push(int size, bool value) {
+    offsets_.push_back(bit_count());
+    bits_.resize(bit_count() + size, value);
+  }
+
+  int TopSize() const { return bit_count() - offsets_.back(); }
+
+  void Pop() {
+    bits_.resize(offsets_.back());
+    offsets_.pop_back();
+  }
+
+  reference operator[](int i) { return bits_[offsets_.back() + i]; }
+
+  bool operator[](int i) const { return bits_[offsets_.back() + i]; }
+
+ private:
+  int bit_count() const { return static_cast<int>(bits_.size()); }
+  std::vector<bool> bits_;
+  std::vector<int> offsets_;
+};
+
+/// \brief ArrayBuilder for parsed but unconverted arrays
+template <Kind::type>
+class RawArrayBuilder;
+
+/// \brief packed pointer to a RawArrayBuilder
+///
+/// RawArrayBuilders are stored in HandlerBase,
+/// which allows storage of their indices (uint32_t) instead of a full pointer.
+/// BuilderPtr is also tagged with the json kind and nullable properties
+/// so those can be accessed before dereferencing the builder.
+struct BuilderPtr {
+  BuilderPtr() : BuilderPtr(BuilderPtr::null) {}
+  BuilderPtr(Kind::type k, uint32_t i, bool n) : index(i), kind(k), nullable(n) {}
+
+  BuilderPtr(const BuilderPtr&) = default;
+  BuilderPtr& operator=(const BuilderPtr&) = default;
+  BuilderPtr(BuilderPtr&&) = default;
+  BuilderPtr& operator=(BuilderPtr&&) = default;
+
+  // index of builder in its arena
+  // OR the length of that builder if kind == Kind::kNull
+  // (we don't allocate an arena for nulls since they're trivial)
+  // FIXME(bkietz) GCC is emitting conversion errors for the bitfields
+  uint32_t index;  // : 28;
+  Kind::type kind;
+  bool nullable;
+
+  bool operator==(BuilderPtr other) const {
+    return kind == other.kind && index == other.index;
+  }
+
+  bool operator!=(BuilderPtr other) const { return !(other == *this); }
+
+  operator bool() const { return *this != null; }
+
+  bool operator!() const { return *this == null; }
+
+  static const BuilderPtr null;
+};
+
+const BuilderPtr BuilderPtr::null(Kind::kNull, 0, false);
+
+template <>
+class RawArrayBuilder<Kind::kBoolean> {
+ public:
+  explicit RawArrayBuilder(MemoryPool* pool)
+      : data_builder_(pool), null_bitmap_builder_(pool) {}
+
+  Status Append(bool value) {
+    RETURN_NOT_OK(data_builder_.Append(value));
+    return null_bitmap_builder_.Append(true);
+  }
+
+  Status AppendNull() {
+    RETURN_NOT_OK(data_builder_.Append(false));
+    return null_bitmap_builder_.Append(false);
+  }
+
+  Status AppendNull(int64_t count) {
+    RETURN_NOT_OK(data_builder_.Append(count, false));
+    return null_bitmap_builder_.Append(count, false);
+  }
+
+  Status Finish(std::shared_ptr<Array>* out) {
+    auto size = length();
+    auto null_count = null_bitmap_builder_.false_count();
+    std::shared_ptr<Buffer> data, null_bitmap;
+    RETURN_NOT_OK(data_builder_.Finish(&data));
+    RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+    *out = MakeArray(ArrayData::Make(boolean(), size, {null_bitmap, data}, null_count));
+    return Status::OK();
+  }
+
+  int64_t length() { return null_bitmap_builder_.length(); }
+
+ private:
+  TypedBufferBuilder<bool> data_builder_;
+  TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+/// \brief builder for strings or unconverted numbers
+///
+/// Both of these are represented in the builder as an index only;
+/// the actual characters are stored in a single StringArray (into which
+/// an index refers). This means building is faster since we don't do
+/// allocation for string/number characters but accessing is strided.
+///
+/// On completion the indices and the character storage are combined into
+/// a DictionaryArray, which is a convenient container for indices referring
+/// into another array.
+class ScalarBuilder {
+ public:
+  explicit ScalarBuilder(MemoryPool* pool)
+      : data_builder_(pool), null_bitmap_builder_(pool) {}
+
+  Status Append(int32_t index) {
+    RETURN_NOT_OK(data_builder_.Append(index));
+    return null_bitmap_builder_.Append(true);
+  }
+
+  Status AppendNull() {
+    RETURN_NOT_OK(data_builder_.Append(0));
+    return null_bitmap_builder_.Append(false);
+  }
+
+  Status AppendNull(int64_t count) {
+    RETURN_NOT_OK(data_builder_.Append(count, 0));
+    return null_bitmap_builder_.Append(count, false);
+  }
+
+  Status Finish(std::shared_ptr<Array>* out) {
+    auto size = length();
+    auto null_count = null_bitmap_builder_.false_count();
+    std::shared_ptr<Buffer> data, null_bitmap;
+    RETURN_NOT_OK(data_builder_.Finish(&data));
+    RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+    *out = MakeArray(ArrayData::Make(int32(), size, {null_bitmap, data}, null_count));
+    return Status::OK();
+  }
+
+  int64_t length() { return null_bitmap_builder_.length(); }
+
+  // TODO(bkietz) track total length of bytes for later simpler allocation
+
+ private:
+  TypedBufferBuilder<int32_t> data_builder_;
+  TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+template <>
+class RawArrayBuilder<Kind::kNumber> : public ScalarBuilder {
+ public:
+  using ScalarBuilder::ScalarBuilder;
+};
+
+template <>
+class RawArrayBuilder<Kind::kString> : public ScalarBuilder {
+ public:
+  using ScalarBuilder::ScalarBuilder;
+};
+
+template <>
+class RawArrayBuilder<Kind::kArray> {
+ public:
+  explicit RawArrayBuilder(MemoryPool* pool)
+      : offset_builder_(pool), null_bitmap_builder_(pool) {}
+
+  Status Append(int32_t child_length) {
+    RETURN_NOT_OK(offset_builder_.Append(offset_));
+    offset_ += child_length;
+    return null_bitmap_builder_.Append(true);
+  }
+
+  Status AppendNull() {
+    RETURN_NOT_OK(offset_builder_.Append(offset_));
+    return null_bitmap_builder_.Append(false);
+  }
+
+  Status AppendNull(int64_t count) {
+    RETURN_NOT_OK(offset_builder_.Append(count, offset_));
+    return null_bitmap_builder_.Append(count, false);
+  }
+
+  template <typename HandlerBase>
+  Status Finish(HandlerBase& handler, std::shared_ptr<Array>* out) {
+    RETURN_NOT_OK(offset_builder_.Append(offset_));
+    auto size = length();
+    auto null_count = null_bitmap_builder_.false_count();
+    std::shared_ptr<Buffer> offsets, null_bitmap;
+    RETURN_NOT_OK(offset_builder_.Finish(&offsets));
+    RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+    std::shared_ptr<Array> values;
+    RETURN_NOT_OK(handler.Finish(value_builder_, &values));
+    auto type = list(
+        field("item", values->type(), value_builder_.nullable, Tag(value_builder_.kind)));
+    *out = MakeArray(ArrayData::Make(type, size, {null_bitmap, offsets}, {values->data()},
+                                     null_count));
+    return Status::OK();
+  }
+
+  BuilderPtr value_builder() const { return value_builder_; }
+
+  void value_builder(BuilderPtr builder) { value_builder_ = builder; }
+
+  int64_t length() { return null_bitmap_builder_.length(); }
+
+ private:
+  BuilderPtr value_builder_ = BuilderPtr::null;
+  int32_t offset_ = 0;
+  TypedBufferBuilder<int32_t> offset_builder_;
+  TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+template <>
+class RawArrayBuilder<Kind::kObject> {
+ public:
+  explicit RawArrayBuilder(MemoryPool* pool) : null_bitmap_builder_(pool) {}
+
+  Status Append() { return null_bitmap_builder_.Append(true); }
+
+  Status AppendNull() { return null_bitmap_builder_.Append(false); }
+
+  Status AppendNull(int64_t count) { return null_bitmap_builder_.Append(count, false); }
+
+  int GetFieldIndex(const std::string& name) const {
+    auto it = name_to_index_.find(name);
+    if (it == name_to_index_.end()) {
+      return -1;
+    }
+    return it->second;
+  }
+
+  int AddField(std::string name, BuilderPtr builder) {
+    auto index = num_fields();
+    field_builders_.push_back(builder);
+    name_to_index_.emplace(std::move(name), index);
+    return index;
+  }
+
+  int num_fields() const { return static_cast<int>(field_builders_.size()); }
+
+  BuilderPtr field_builder(int index) const { return field_builders_[index]; }
+
+  void field_builder(int index, BuilderPtr builder) { field_builders_[index] = builder; }
+
+  template <typename HandlerBase>
+  Status Finish(HandlerBase& handler, std::shared_ptr<Array>* out) {
+    auto size = length();
+    auto null_count = null_bitmap_builder_.false_count();
+    std::shared_ptr<Buffer> null_bitmap;
+    RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+
+    std::vector<string_view> field_names(num_fields());
+    for (const auto& name_index : name_to_index_) {
+      field_names[name_index.second] = name_index.first;
+    }
+
+    std::vector<std::shared_ptr<Field>> fields(num_fields());
+    std::vector<std::shared_ptr<ArrayData>> child_data(num_fields());
+    for (int i = 0; i != num_fields(); ++i) {
+      std::shared_ptr<Array> values;
+      RETURN_NOT_OK(handler.Finish(field_builders_[i], &values));
+      child_data[i] = values->data();
+      fields[i] = field(field_names[i].to_string(), values->type(),
+                        field_builders_[i].nullable, Tag(field_builders_[i].kind));
+    }
+
+    *out = MakeArray(ArrayData::Make(struct_(std::move(fields)), size, {null_bitmap},
+                                     std::move(child_data), null_count));
+    return Status::OK();
+  }
+
+  int64_t length() { return null_bitmap_builder_.length(); }
+
+ private:
+  std::vector<BuilderPtr> field_builders_;
+  std::unordered_map<std::string, int> name_to_index_;
+  TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+/// Three implementations are provided for BlockParser::Impl, one for each
+/// UnexpectedFieldBehavior. However most of the logic is identical in each
+/// case, so the majority of the implementation is in this base class
+class HandlerBase : public BlockParser::Impl,
+                    public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, HandlerBase> {
+ public:
+  /// Retrieve a pointer to a builder from a BuilderPtr
+  template <Kind::type kind>
+  typename std::enable_if<kind != Kind::kNull, RawArrayBuilder<kind>*>::type Cast(
+      BuilderPtr builder) {
+    DCHECK_EQ(builder.kind, kind);
+    return arena<kind>().data() + builder.index;
+  }
+
+  /// Accessor for a stored error Status
+  Status Error() { return status_; }
+
+  /// \defgroup rapidjson-handler-interface functions expected by rapidjson::Reader
+  ///
+  /// bool Key(const char* data, rapidjson::SizeType size, ...) is omitted since
+  /// the behavior varies greatly between UnexpectedFieldBehaviors
+  ///
+  /// @{
+  bool Null() {
+    status_ = AppendNull();
+    return status_.ok();
+  }
+
+  bool Bool(bool value) {
+    status_ = AppendBool(value);
+    return status_.ok();
+  }
+
+  bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+    status_ = AppendScalar<Kind::kNumber>(string_view(data, size));
+    return status_.ok();
+  }
+
+  bool String(const char* data, rapidjson::SizeType size, ...) {
+    status_ = AppendScalar<Kind::kString>(string_view(data, size));
+    return status_.ok();
+  }
+
+  bool StartObject() {
+    status_ = StartObjectImpl();
+    return status_.ok();
+  }
+
+  bool EndObject(...) {
+    status_ = EndObjectImpl();
+    return status_.ok();
+  }
+
+  bool StartArray() {
+    status_ = StartArrayImpl();
+    return status_.ok();
+  }
+
+  bool EndArray(rapidjson::SizeType size) {
+    status_ = EndArrayImpl(size);
+    return status_.ok();
+  }
+  /// @}
+
+  /// \brief Set up builders using an expected Schema
+  Status SetSchema(const Schema& s) {
+    DCHECK_EQ(arena<Kind::kObject>().size(), 1);
+    for (const auto& f : s.fields()) {
+      BuilderPtr field_builder;
+      RETURN_NOT_OK(MakeBuilder(*f->type(), 0, &field_builder));
+      field_builder.nullable = f->nullable();
+      Cast<Kind::kObject>(builder_)->AddField(f->name(), field_builder);
+    }
+    return Status::OK();
+  }
+
+  Status Finish(BuilderPtr builder, std::shared_ptr<Array>* out) {
+    switch (builder.kind) {
+      case Kind::kNull: {
+        auto length = static_cast<int64_t>(builder.index);
+        *out = std::make_shared<NullArray>(length);
+        return Status::OK();
+      }
+      case Kind::kBoolean:
+        return Cast<Kind::kBoolean>(builder)->Finish(out);
+      case Kind::kNumber:
+        return FinishScalar(Cast<Kind::kNumber>(builder), out);
+      case Kind::kString:
+        return FinishScalar(Cast<Kind::kString>(builder), out);
+      case Kind::kArray:
+        return Cast<Kind::kArray>(builder)->Finish(*this, out);
+      case Kind::kObject:
+        return Cast<Kind::kObject>(builder)->Finish(*this, out);
+      default:
+        return Status::NotImplemented("invalid builder kind");
+    }
+  }
+
+  Status Finish(std::shared_ptr<Array>* parsed) override {
+    RETURN_NOT_OK(std::move(scalar_values_builder_).Finish(&scalar_values_));
+    return Finish(builder_, parsed);
+  }
+
+  int32_t num_rows() override { return num_rows_; }
+
+ protected:
+  HandlerBase(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
+      : pool_(pool),
+        builder_(Kind::kObject, 0, false),
+        scalar_values_builder_(pool, scalar_storage) {
+    arena<Kind::kObject>().emplace_back(pool_);
+  }
+
+  /// finish a column of scalar values (string or number)
+  Status FinishScalar(ScalarBuilder* builder, std::shared_ptr<Array>* out) {
+    std::shared_ptr<Array> indices;
+    RETURN_NOT_OK(builder->Finish(&indices));
+    return DictionaryArray::FromArrays(dictionary(int32(), scalar_values_), indices, out);
+  }
+
+  template <typename Handler>
+  Status DoParse(Handler& handler, const std::shared_ptr<Buffer>& json) {
+    constexpr auto parse_flags =
+        rapidjson::kParseInsituFlag | rapidjson::kParseIterativeFlag |
+        rapidjson::kParseStopWhenDoneFlag | rapidjson::kParseNumbersAsStringsFlag;
+    auto json_data = reinterpret_cast<char*>(json->mutable_data());
+    rapidjson::GenericInsituStringStream<rapidjson::UTF8<>> ss(json_data);
+    rapidjson::Reader reader;
+
+    for (; num_rows_ != kMaxParserNumRows; ++num_rows_) {
+      // parse a single line of JSON
+      auto ok = reader.Parse<parse_flags>(ss, handler);
+      switch (ok.Code()) {
+        case rapidjson::kParseErrorNone:
+          // parse the next object
+          continue;
+        case rapidjson::kParseErrorDocumentEmpty: {
+          // parsed all objects, finish
+          return Status::OK();
+        }
+        case rapidjson::kParseErrorTermination:
+          // handler emitted an error
+          return handler.Error();
+        default:
+          // rapidjson emitted an error
+          return ParseError(rapidjson::GetParseError_En(ok.Code()));
+      }
+    }
+    return Status::Invalid("Exceeded maximum rows");
+  }
+
+  /// construct a builder of staticallly defined kind in arenas_
+  template <Kind::type kind>
+  Status MakeBuilder(int64_t leading_nulls, BuilderPtr* builder) {
+    builder->index = static_cast<uint32_t>(arena<kind>().size());
+    builder->kind = kind;
+    builder->nullable = true;
+    arena<kind>().emplace_back(pool_);
+    return Cast<kind>(*builder)->AppendNull(leading_nulls);
+  }
+
+  /// construct a builder of whatever kind corresponds to a DataType
+  Status MakeBuilder(const DataType& t, int64_t leading_nulls, BuilderPtr* builder) {
+    Kind::type kind;
+    RETURN_NOT_OK(KindForType(t, &kind));
+    switch (kind) {
+      case Kind::kNull:
+        *builder = BuilderPtr(Kind::kNull, static_cast<uint32_t>(leading_nulls), true);
+        return Status::OK();
+      case Kind::kBoolean:
+        return MakeBuilder<Kind::kBoolean>(leading_nulls, builder);
+      case Kind::kNumber:
+        return MakeBuilder<Kind::kNumber>(leading_nulls, builder);
+      case Kind::kString:
+        return MakeBuilder<Kind::kString>(leading_nulls, builder);
+      case Kind::kArray: {
+        RETURN_NOT_OK(MakeBuilder<Kind::kArray>(leading_nulls, builder));
+        const auto& list_type = static_cast<const ListType&>(t);
+        BuilderPtr value_builder;
+        RETURN_NOT_OK(MakeBuilder(*list_type.value_type(), 0, &value_builder));
+        value_builder.nullable = list_type.value_field()->nullable();
+        Cast<Kind::kArray>(*builder)->value_builder(value_builder);
+        return Status::OK();
+      }
+      case Kind::kObject: {
+        RETURN_NOT_OK(MakeBuilder<Kind::kObject>(leading_nulls, builder));
+        const auto& struct_type = static_cast<const StructType&>(t);
+        for (const auto& f : struct_type.children()) {
+          BuilderPtr field_builder;
+          RETURN_NOT_OK(MakeBuilder(*f->type(), leading_nulls, &field_builder));
+          field_builder.nullable = f->nullable();
+          Cast<Kind::kObject>(*builder)->AddField(f->name(), field_builder);
+        }
+        return Status::OK();
+      }
+      default:
+        return Status::NotImplemented("invalid builder type");
+    }
+  }
+
+  /// \defgroup handlerbase-append-methods append non-nested values
+  ///
+  /// These methods act on builder_
+  /// @{
+
+  Status AppendNull() {
+    if (ARROW_PREDICT_FALSE(!builder_.nullable)) {
+      return ParseError("a required field was null");
+    }
+    switch (builder_.kind) {
+      case Kind::kNull: {
+        // increment null count stored inline
+        // update the parent, since changing builder_ doesn't affect parent
+        auto parent = builder_stack_.back();
+        if (parent.kind == Kind::kArray) {
+          auto list_builder = Cast<Kind::kArray>(parent);
+          DCHECK_EQ(list_builder->value_builder(), builder_);
+          builder_.index += 1;
+          list_builder->value_builder(builder_);
+        } else {
+          auto struct_builder = Cast<Kind::kObject>(parent);
+          DCHECK_EQ(struct_builder->field_builder(field_index_), builder_);
+          builder_.index += 1;
+          struct_builder->field_builder(field_index_, builder_);
+        }
+        return Status::OK();
+      }
+      case Kind::kBoolean:
+        return Cast<Kind::kBoolean>(builder_)->AppendNull();
+      case Kind::kNumber:
+        return Cast<Kind::kNumber>(builder_)->AppendNull();
+      case Kind::kString:
+        return Cast<Kind::kString>(builder_)->AppendNull();
+      case Kind::kArray:
+        return Cast<Kind::kArray>(builder_)->AppendNull();
+      case Kind::kObject: {
+        auto root = builder_;
+        auto struct_builder = Cast<Kind::kObject>(builder_);
+        RETURN_NOT_OK(struct_builder->AppendNull());
+        for (int i = 0; i != struct_builder->num_fields(); ++i) {
+          builder_ = struct_builder->field_builder(i);
+          RETURN_NOT_OK(AppendNull());
+        }
+        builder_ = root;
+        return Status::OK();
+      }
+      default:
+        return Status::NotImplemented("invalid builder Kind");
+    }
+  }
+
+  Status AppendBool(bool value) {
+    constexpr auto kind = Kind::kBoolean;
+    if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+      return IllegallyChangedTo(kind);
+    }
+    return Cast<kind>(builder_)->Append(value);
+  }
+
+  template <Kind::type kind>
+  Status AppendScalar(string_view scalar) {
+    if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+      return IllegallyChangedTo(kind);
+    }
+    auto index = static_cast<int32_t>(scalar_values_builder_.length());
+    RETURN_NOT_OK(Cast<kind>(builder_)->Append(index));
+    return scalar_values_builder_.Append(scalar);
+  }
+
+  /// @}
+
+  Status StartObjectImpl() {
+    constexpr auto kind = Kind::kObject;
+    if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+      return IllegallyChangedTo(kind);
+    }
+    auto struct_builder = Cast<kind>(builder_);
+    absent_fields_stack_.Push(struct_builder->num_fields(), true);
+    PushStacks();
+    return struct_builder->Append();
+  }
+
+  /// \brief helper for Key() functions
+  ///
+  /// sets the field builder with name key, or returns false if
+  /// there is no field with that name
+  bool SetFieldBuilder(string_view key) {
+    auto parent = Cast<Kind::kObject>(builder_stack_.back());
+    field_index_ = parent->GetFieldIndex(std::string(key));
+    if (ARROW_PREDICT_FALSE(field_index_ == -1)) {
+      return false;
+    }
+    builder_ = parent->field_builder(field_index_);
+    absent_fields_stack_[field_index_] = false;
+    return true;
+  }
+
+  Status EndObjectImpl() {
+    auto parent = Cast<Kind::kObject>(builder_stack_.back());
+
+    auto expected_count = absent_fields_stack_.TopSize();
+    for (field_index_ = 0; field_index_ != expected_count; ++field_index_) {
+      if (!absent_fields_stack_[field_index_]) {
+        continue;
+      }
+      builder_ = parent->field_builder(field_index_);
+      if (ARROW_PREDICT_FALSE(!builder_.nullable)) {
+        return ParseError("a required field was absent");
+      }
+      RETURN_NOT_OK(AppendNull());
+    }
+    absent_fields_stack_.Pop();
+    PopStacks();
+    return Status::OK();
+  }
+
+  Status StartArrayImpl() {
+    constexpr auto kind = Kind::kArray;
+    if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+      return IllegallyChangedTo(kind);
+    }
+    PushStacks();
+    // append to the list builder in EndArrayImpl
+    builder_ = Cast<kind>(builder_)->value_builder();
+    return Status::OK();
+  }
+
+  Status EndArrayImpl(rapidjson::SizeType size) {
+    PopStacks();
+    // append to list_builder here
+    auto list_builder = Cast<Kind::kArray>(builder_);
+    return list_builder->Append(size);
+  }
+
+  /// helper method for StartArray and StartObject
+  /// adds the current builder to a stack so its
+  /// children can be visited and parsed.
+  void PushStacks() {
+    field_index_stack_.push_back(field_index_);
+    field_index_ = -1;
+    builder_stack_.push_back(builder_);
+  }
+
+  /// helper method for EndArray and EndObject
+  /// replaces the current builder with its parent
+  /// so parsing of the parent can continue
+  void PopStacks() {
+    field_index_ = field_index_stack_.back();
+    field_index_stack_.pop_back();
+    builder_ = builder_stack_.back();
+    builder_stack_.pop_back();
+  }
+
+  Status IllegallyChangedTo(Kind::type illegally_changed_to) {
+    return KindChangeError(builder_.kind, illegally_changed_to);
+  }
+
+  template <Kind::type kind>
+  std::vector<RawArrayBuilder<kind>>& arena() {
+    return std::get<static_cast<std::size_t>(kind)>(arenas_);
+  }
+
+  Status status_;
+  MemoryPool* pool_;
+  std::tuple<std::tuple<>, std::vector<RawArrayBuilder<Kind::kBoolean>>,
+             std::vector<RawArrayBuilder<Kind::kNumber>>,
+             std::vector<RawArrayBuilder<Kind::kString>>,
+             std::vector<RawArrayBuilder<Kind::kArray>>,
+             std::vector<RawArrayBuilder<Kind::kObject>>>
+      arenas_;
+  BuilderPtr builder_;
+  // top of this stack is the parent of builder_
+  std::vector<BuilderPtr> builder_stack_;
+  // top of this stack refers to the fields of the highest *StructBuilder*
+  // in builder_stack_ (list builders don't have absent fields)
+  BitsetStack absent_fields_stack_;
+  // index of builder_ within its parent
+  int field_index_;
+  // top of this stack == field_index_
+  std::vector<int> field_index_stack_;
+  UnsafeStringBuilder scalar_values_builder_;
+  std::shared_ptr<Array> scalar_values_;
+  int32_t num_rows_ = 0;
+};
+
+template <UnexpectedFieldBehavior>
+class Handler;
+
+template <>
+class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase {
+ public:
+  Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
+      : HandlerBase(pool, scalar_storage) {}
+
+  Status Parse(const std::shared_ptr<Buffer>& json) override {
+    return DoParse(*this, json);
+  }
+
+  /// \ingroup rapidjson-handler-interface
+  ///
+  /// if an unexpected field is encountered, emit a parse error and bail
+  bool Key(const char* key, rapidjson::SizeType len, ...) {
+    if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
+      return true;
+    }
+    status_ = ParseError("unexpected field");
+    return false;
+  }
+};
+
+template <>
+class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
+ public:
+  Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
+      : HandlerBase(pool, scalar_storage) {}
+
+  Status Parse(const std::shared_ptr<Buffer>& json) override {
+    return DoParse(*this, json);
+  }
+
+  bool Null() {
+    if (Skipping()) {
+      return true;
+    }
+    return HandlerBase::Null();
+  }
+
+  bool Bool(bool value) {
+    if (Skipping()) {
+      return true;
+    }
+    return HandlerBase::Bool(value);
+  }
+
+  bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+    if (Skipping()) {
+      return true;
+    }
+    return HandlerBase::RawNumber(data, size);
+  }
+
+  bool String(const char* data, rapidjson::SizeType size, ...) {
+    if (Skipping()) {
+      return true;
+    }
+    return HandlerBase::String(data, size);
+  }
+
+  bool StartObject() {
+    ++depth_;
+    if (Skipping()) {
+      return true;
+    }
+    return HandlerBase::StartObject();
+  }
+
+  /// \ingroup rapidjson-handler-interface
+  ///
+  /// if an unexpected field is encountered, skip until its value has been consumed
+  bool Key(const char* key, rapidjson::SizeType len, ...) {
+    MaybeStopSkipping();
+    if (Skipping()) {
+      return true;
+    }
+    if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
+      return true;
+    }
+    skip_depth_ = depth_;
+    return true;
+  }
+
+  bool EndObject(...) {
+    MaybeStopSkipping();
+    --depth_;
+    if (Skipping()) {
+      return true;
+    }
+    return HandlerBase::EndObject();
+  }
+
+  bool StartArray() {
+    if (Skipping()) {
+      return true;
+    }
+    return HandlerBase::StartArray();
+  }
+
+  bool EndArray(rapidjson::SizeType size) {
+    if (Skipping()) {
+      return true;
+    }
+    return HandlerBase::EndArray(size);
+  }
+
+ private:
+  bool Skipping() { return depth_ >= skip_depth_; }
+
+  void MaybeStopSkipping() {
+    if (skip_depth_ == depth_) {
+      skip_depth_ = std::numeric_limits<int>::max();
+    }
+  }
+
+  int depth_ = 0;
+  int skip_depth_ = std::numeric_limits<int>::max();
+};
+
+template <>
+class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
+ public:
+  Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
+      : HandlerBase(pool, scalar_storage) {}
+
+  Status Parse(const std::shared_ptr<Buffer>& json) override {
+    return DoParse(*this, json);
+  }
+
+  bool Bool(bool value) {
+    if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kBoolean>())) {
+      return false;
+    }
+    return HandlerBase::Bool(value);
+  }
+
+  bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+    if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kNumber>())) {
+      return false;
+    }
+    return HandlerBase::RawNumber(data, size);
+  }
+
+  bool String(const char* data, rapidjson::SizeType size, ...) {
+    if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kString>())) {
+      return false;
+    }
+    return HandlerBase::String(data, size);
+  }
+
+  bool StartObject() {
+    if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kObject>())) {
+      return false;
+    }
+    return HandlerBase::StartObject();
+  }
+
+  /// \ingroup rapidjson-handler-interface
+  ///
+  /// If an unexpected field is encountered, add a new builder to
+  /// the current parent builder. It is added as a NullBuilder with
+  /// (parent.length - 1) leading nulls. The next value parsed
+  /// will probably trigger promotion of this field from null
+  bool Key(const char* key, rapidjson::SizeType len, ...) {
+    if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
+      return true;
+    }
+    auto struct_builder = Cast<Kind::kObject>(builder_stack_.back());
+    auto leading_nulls = static_cast<uint32_t>(struct_builder->length() - 1);
+    builder_ = BuilderPtr(Kind::kNull, leading_nulls, true);
+    field_index_ = struct_builder->AddField(std::string(key, len), builder_);
+    return true;
+  }
+
+  bool StartArray() {
+    if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kArray>())) {
+      return false;
+    }
+    return HandlerBase::StartArray();
+  }
+
+ private:
+  // return true if a terminal error was encountered
+  template <Kind::type kind>
+  bool MaybePromoteFromNull() {
+    if (ARROW_PREDICT_TRUE(builder_.kind != Kind::kNull)) {
+      return false;
+    }
+    auto parent = builder_stack_.back();
+    if (parent.kind == Kind::kArray) {
+      auto list_builder = Cast<Kind::kArray>(parent);
+      DCHECK_EQ(list_builder->value_builder(), builder_);
+      status_ = MakeBuilder<kind>(builder_.index, &builder_);
+      if (ARROW_PREDICT_FALSE(!status_.ok())) {
+        return true;
+      }
+      list_builder = Cast<Kind::kArray>(parent);
+      list_builder->value_builder(builder_);
+    } else {
+      auto struct_builder = Cast<Kind::kObject>(parent);
+      DCHECK_EQ(struct_builder->field_builder(field_index_), builder_);
+      status_ = MakeBuilder<kind>(builder_.index, &builder_);
+      if (ARROW_PREDICT_FALSE(!status_.ok())) {
+        return true;
+      }
+      struct_builder = Cast<Kind::kObject>(parent);
+      struct_builder->field_builder(field_index_, builder_);
+    }
+    return false;
+  }
+};
+
+BlockParser::BlockParser(MemoryPool* pool, ParseOptions options,
+                         const std::shared_ptr<Buffer>& scalar_storage)
+    : pool_(pool), options_(options) {
+  DCHECK(options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType ||
+         options_.explicit_schema != nullptr);
+  switch (options_.unexpected_field_behavior) {
+    case UnexpectedFieldBehavior::Ignore: {
+      auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::Ignore>>(
+          pool_, scalar_storage);
+      // FIXME(bkietz) move this to an Initialize()
+      ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
+      impl_ = std::move(handler);
+      break;
+    }
+    case UnexpectedFieldBehavior::Error: {
+      auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::Error>>(
+          pool_, scalar_storage);
+      ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
+      impl_ = std::move(handler);
+      break;
+    }
+    case UnexpectedFieldBehavior::InferType:
+      auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::InferType>>(
+          pool_, scalar_storage);
+      if (options.explicit_schema) {
+        ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
+      }
+      impl_ = std::move(handler);
+      break;
+  }
+}
+
+BlockParser::BlockParser(ParseOptions options,
+                         const std::shared_ptr<Buffer>& scalar_storage)
+    : BlockParser(default_memory_pool(), options, scalar_storage) {}
+
+}  // namespace json
+}  // namespace arrow
diff --git a/cpp/src/arrow/json/parser.h b/cpp/src/arrow/json/parser.h
new file mode 100644
index 0000000..c001598
--- /dev/null
+++ b/cpp/src/arrow/json/parser.h
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_PARSER_H
+#define ARROW_JSON_PARSER_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "arrow/builder.h"
+#include "arrow/json/options.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/visibility.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+class MemoryPool;
+class RecordBatch;
+
+namespace json {
+
+struct Kind {
+  enum type : uint8_t { kNull, kBoolean, kNumber, kString, kArray, kObject };
+};
+
+inline static const std::shared_ptr<const KeyValueMetadata>& Tag(Kind::type k) {
+  static std::shared_ptr<const KeyValueMetadata> tags[] = {
+      key_value_metadata({{"json_kind", "null"}}),
+      key_value_metadata({{"json_kind", "boolean"}}),
+      key_value_metadata({{"json_kind", "number"}}),
+      key_value_metadata({{"json_kind", "string"}}),
+      key_value_metadata({{"json_kind", "array"}}),
+      key_value_metadata({{"json_kind", "object"}})};
+  return tags[static_cast<uint8_t>(k)];
+}
+
+inline static Status KindForType(const DataType& type, Kind::type* kind) {
+  struct {
+    Status Visit(const NullType&) { return SetKind(Kind::kNull); }
+    Status Visit(const BooleanType&) { return SetKind(Kind::kBoolean); }
+    Status Visit(const Number&) { return SetKind(Kind::kNumber); }
+    // XXX should TimeType & DateType be Kind::kNumber or Kind::kString?
+    Status Visit(const TimeType&) { return SetKind(Kind::kNumber); }
+    Status Visit(const DateType&) { return SetKind(Kind::kNumber); }
+    Status Visit(const BinaryType&) { return SetKind(Kind::kString); }
+    // XXX should Decimal128Type be Kind::kNumber or Kind::kString?
+    Status Visit(const FixedSizeBinaryType&) { return SetKind(Kind::kString); }
+    Status Visit(const DictionaryType& dict_type) {
+      return KindForType(*dict_type.dictionary()->type(), kind_);
+    }
+    Status Visit(const ListType&) { return SetKind(Kind::kArray); }
+    Status Visit(const StructType&) { return SetKind(Kind::kObject); }
+    Status Visit(const DataType& not_impl) {
+      return Status::NotImplemented("JSON parsing of ", not_impl);
+    }
+    Status SetKind(Kind::type kind) {
+      *kind_ = kind;
+      return Status::OK();
+    }
+    Kind::type* kind_;
+  } visitor = {kind};
+  return VisitTypeInline(type, &visitor);
+}
+
+constexpr int32_t kMaxParserNumRows = 100000;
+
+/// \class BlockParser
+/// \brief A reusable block-based parser for JSON data
+///
+/// The parser takes a block of newline delimited JSON data and extracts
+/// keys and value pairs, inserting into provided ArrayBuilders.
+/// Parsed data is own by the
+/// parser, so the original buffer can be discarded after Parse() returns.
+class ARROW_EXPORT BlockParser {
+ public:
+  BlockParser(MemoryPool* pool, ParseOptions options,
+              const std::shared_ptr<Buffer>& scalar_storage);
+  BlockParser(ParseOptions options, const std::shared_ptr<Buffer>& scalar_storage);
+
+  /// \brief Parse a block of data insitu (destructively)
+  /// \warning The input must be null terminated
+  Status Parse(const std::shared_ptr<Buffer>& json) { return impl_->Parse(json); }
+
+  /// \brief Extract parsed data
+  Status Finish(std::shared_ptr<Array>* parsed) { return impl_->Finish(parsed); }
+
+  /// \brief Return the number of parsed rows
+  int32_t num_rows() const { return impl_->num_rows(); }
+
+  struct Impl {
+    virtual ~Impl() = default;
+    virtual Status Parse(const std::shared_ptr<Buffer>& json) = 0;
+    virtual Status Finish(std::shared_ptr<Array>* parsed) = 0;
+    virtual int32_t num_rows() = 0;
+  };
+
+ protected:
+  ARROW_DISALLOW_COPY_AND_ASSIGN(BlockParser);
+
+  MemoryPool* pool_;
+  const ParseOptions options_;
+  std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace json
+}  // namespace arrow
+
+#endif  // ARROW_JSON_PARSER_H
diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc
new file mode 100644
index 0000000..0e147ab
--- /dev/null
+++ b/cpp/src/arrow/json/reader.cc
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/reader.h"
+
+#include <unordered_map>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/parsing.h"
+
+namespace arrow {
+namespace json {
+
+using internal::StringConverter;
+
+Kind::type KindFromTag(const std::shared_ptr<const KeyValueMetadata>& tag) {
+  std::string kind_name = tag->value(0);
+  switch (kind_name[0]) {
+    case 'n':
+      if (kind_name[2] == 'l') {
+        return Kind::kNull;
+      } else {
+        return Kind::kNumber;
+      }
+    case 'b':
+      return Kind::kBoolean;
+    case 's':
+      return Kind::kString;
+    case 'o':
+      return Kind::kObject;
+    case 'a':
+      return Kind::kArray;
+    default:
+      ARROW_LOG(FATAL);
+      return Kind::kNull;
+  }
+}
+
+static Status Convert(const std::shared_ptr<DataType>& out_type,
+                      std::shared_ptr<Array> in, std::shared_ptr<Array>* out);
+
+struct ConvertImpl {
+  Status Visit(const NullType&) {
+    *out = in;
+    return Status::OK();
+  }
+  Status Visit(const BooleanType&) {
+    *out = in;
+    return Status::OK();
+  }
+  // handle conversion to types with StringConverter
+  template <typename T>
+  Status ConvertEachWith(const T& t, StringConverter<T>& convert_one) {
+    auto dict_array = static_cast<const DictionaryArray*>(in.get());
+    const StringArray& dict = static_cast<const StringArray&>(*dict_array->dictionary());
+    const Int32Array& indices = static_cast<const Int32Array&>(*dict_array->indices());
+    using Builder = typename TypeTraits<T>::BuilderType;
+    Builder builder(out_type, default_memory_pool());
+    RETURN_NOT_OK(builder.Resize(indices.length()));
+    for (int64_t i = 0; i != indices.length(); ++i) {
+      if (indices.IsNull(i)) {
+        builder.UnsafeAppendNull();
+        continue;
+      }
+      auto repr = dict.GetView(indices.GetView(i));
+      typename StringConverter<T>::value_type value;
+      if (!convert_one(repr.data(), repr.size(), &value)) {
+        return Status::Invalid("Failed of conversion of JSON to ", t, ":", repr);
+      }
+      builder.UnsafeAppend(value);
+    }
+    return builder.Finish(out);
+  }
+  template <typename T>
+  Status Visit(const T& t, decltype(StringConverter<T>())* = nullptr) {
+    StringConverter<T> convert_one;
+    return ConvertEachWith(t, convert_one);
+  }
+  // handle conversion to Timestamp
+  Status Visit(const TimestampType& t) {
+    StringConverter<TimestampType> convert_one(out_type);
+    return ConvertEachWith(t, convert_one);
+  }
+  Status VisitAs(const std::shared_ptr<DataType>& repr_type) {
+    std::shared_ptr<Array> repr_array;
+    RETURN_NOT_OK(Convert(repr_type, in, &repr_array));
+    auto data = repr_array->data();
+    data->type = out_type;
+    *out = MakeArray(data);
+    return Status::OK();
+  }
+  // handle half explicitly
+  Status Visit(const HalfFloatType&) { return VisitAs(float32()); }
+  // handle types represented as integers
+  template <typename T>
+  Status Visit(
+      const T& t,
+      typename std::enable_if<std::is_base_of<TimeType, T>::value ||
+                              std::is_base_of<DateType, T>::value>::type* = nullptr) {
+    return VisitAs(std::is_same<typename T::c_type, int64_t>::value ? int64() : int32());
+  }
+  // handle binary and string
+  template <typename T>
+  Status Visit(
+      const T& t,
+      typename std::enable_if<std::is_base_of<BinaryType, T>::value>::type* = nullptr) {
+    auto dict_array = static_cast<const DictionaryArray*>(in.get());
+    const StringArray& dict = static_cast<const StringArray&>(*dict_array->dictionary());
+    const Int32Array& indices = static_cast<const Int32Array&>(*dict_array->indices());
+    using Builder = typename TypeTraits<T>::BuilderType;
+    Builder builder(out_type, default_memory_pool());
+    RETURN_NOT_OK(builder.Resize(indices.length()));
+    int64_t values_length = 0;
+    for (int64_t i = 0; i != indices.length(); ++i) {
+      if (indices.IsNull(i)) {
+        continue;
+      }
+      values_length += dict.GetView(indices.GetView(i)).size();
+    }
+    RETURN_NOT_OK(builder.ReserveData(values_length));
+    for (int64_t i = 0; i != indices.length(); ++i) {
+      if (indices.IsNull(i)) {
+        builder.UnsafeAppendNull();
+        continue;
+      }
+      auto value = dict.GetView(indices.GetView(i));
+      builder.UnsafeAppend(value);
+    }
+    return builder.Finish(out);
+  }
+  Status Visit(const ListType& t) {
+    auto list_array = static_cast<const ListArray*>(in.get());
+    std::shared_ptr<Array> values;
+    auto value_type = t.value_type();
+    RETURN_NOT_OK(Convert(value_type, list_array->values(), &values));
+    auto data = ArrayData::Make(out_type, in->length(),
+                                {in->null_bitmap(), list_array->value_offsets()},
+                                {values->data()}, in->null_count());
+    *out = MakeArray(data);
+    return Status::OK();
+  }
+  Status Visit(const StructType& t) {
+    auto struct_array = static_cast<const StructArray*>(in.get());
+    std::vector<std::shared_ptr<ArrayData>> child_data(t.num_children());
+    for (int i = 0; i != t.num_children(); ++i) {
+      std::shared_ptr<Array> child;
+      RETURN_NOT_OK(Convert(t.child(i)->type(), struct_array->field(i), &child));
+      child_data[i] = child->data();
+    }
+    auto data = ArrayData::Make(out_type, in->length(), {in->null_bitmap()},
+                                std::move(child_data), in->null_count());
+    *out = MakeArray(data);
+    return Status::OK();
+  }
+  Status Visit(const DataType& not_impl) {
+    return Status::NotImplemented("JSON parsing of ", not_impl);
+  }
+  std::shared_ptr<DataType> out_type;
+  std::shared_ptr<Array> in;
+  std::shared_ptr<Array>* out;
+};
+
+static Status Convert(const std::shared_ptr<DataType>& out_type,
+                      std::shared_ptr<Array> in, std::shared_ptr<Array>* out) {
+  ConvertImpl visitor = {out_type, in, out};
+  return VisitTypeInline(*out_type, &visitor);
+}
+
+static Status InferAndConvert(std::shared_ptr<DataType> expected,
+                              const std::shared_ptr<const KeyValueMetadata>& tag,
+                              const std::shared_ptr<Array>& in,
+                              std::shared_ptr<Array>* out) {
+  Kind::type kind = KindFromTag(tag);
+  switch (kind) {
+    case Kind::kObject: {
+      // FIXME(bkietz) in general expected fields may not be an exact prefix of parsed's
+      auto in_type = static_cast<StructType*>(in->type().get());
+      if (expected == nullptr) {
+        expected = struct_({});
+      }
+      auto expected_type = static_cast<StructType*>(expected.get());
+      if (in_type->num_children() == expected_type->num_children()) {
+        return Convert(expected, in, out);
+      }
+
+      auto fields = expected_type->children();
+      fields.resize(in_type->num_children());
+      std::vector<std::shared_ptr<ArrayData>> child_data(in_type->num_children());
+
+      for (int i = 0; i != in_type->num_children(); ++i) {
+        std::shared_ptr<DataType> expected_field_type;
+        if (i < expected_type->num_children()) {
+          expected_field_type = expected_type->child(i)->type();
+        }
+        auto in_field = in_type->child(i);
+        auto in_column = static_cast<StructArray*>(in.get())->field(i);
+        std::shared_ptr<Array> column;
+        RETURN_NOT_OK(InferAndConvert(expected_field_type, in_field->metadata(),
+                                      in_column, &column));
+        fields[i] = field(in_field->name(), column->type());
+        child_data[i] = column->data();
+      }
+      auto data =
+          ArrayData::Make(struct_(std::move(fields)), in->length(), {in->null_bitmap()},
+                          std::move(child_data), in->null_count());
+      *out = MakeArray(data);
+      return Status::OK();
+    }
+    case Kind::kArray: {
+      auto list_array = static_cast<const ListArray*>(in.get());
+      auto value_tag = list_array->list_type()->value_field()->metadata();
+      std::shared_ptr<Array> values;
+      if (expected != nullptr) {
+        RETURN_NOT_OK(InferAndConvert(expected->child(0)->type(), value_tag,
+                                      list_array->values(), &values));
+      } else {
+        RETURN_NOT_OK(InferAndConvert(nullptr, value_tag, list_array->values(), &values));
+      }
+      auto data = ArrayData::Make(list(values->type()), in->length(),
+                                  {in->null_bitmap(), list_array->value_offsets()},
+                                  {values->data()}, in->null_count());
+      *out = MakeArray(data);
+      return Status::OK();
+    }
+    default:
+      // an expected type overrides inferrence for scalars
+      // (but not nested types, which may have unexpected fields)
+      if (expected != nullptr) {
+        return Convert(expected, in, out);
+      }
+  }
+  switch (kind) {
+    case Kind::kNull:
+      return Convert(null(), in, out);
+    case Kind::kBoolean:
+      return Convert(boolean(), in, out);
+    case Kind::kNumber:
+      // attempt conversion to Int64 first
+      if (Convert(int64(), in, out).ok()) {
+        return Status::OK();
+      }
+      return Convert(float64(), in, out);
+    case Kind::kString:  // attempt conversion to Timestamp first
+      if (Convert(timestamp(TimeUnit::SECOND), in, out).ok()) {
+        return Status::OK();
+      }
+      return Convert(utf8(), in, out);
+    default:
+      return Status::Invalid("invalid JSON kind");
+  }
+}
+
+Status ParseOne(ParseOptions options, std::shared_ptr<Buffer> json,
+                std::shared_ptr<RecordBatch>* out) {
+  BlockParser parser(default_memory_pool(), options, json);
+  RETURN_NOT_OK(parser.Parse(json));
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser.Finish(&parsed));
+  std::shared_ptr<Array> converted;
+  auto schm = options.explicit_schema;
+  if (options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+    if (schm) {
+      RETURN_NOT_OK(InferAndConvert(struct_(schm->fields()), Tag(Kind::kObject), parsed,
+                                    &converted));
+    } else {
+      RETURN_NOT_OK(InferAndConvert(nullptr, Tag(Kind::kObject), parsed, &converted));
+    }
+    schm = schema(converted->type()->children());
+  } else {
+    RETURN_NOT_OK(Convert(struct_(schm->fields()), parsed, &converted));
+  }
+  std::vector<std::shared_ptr<Array>> columns(parsed->num_fields());
+  for (int i = 0; i != parsed->num_fields(); ++i) {
+    columns[i] = static_cast<StructArray*>(converted.get())->field(i);
+  }
+  *out = RecordBatch::Make(schm, parsed->length(), std::move(columns));
+  return Status::OK();
+}
+
+}  // namespace json
+}  // namespace arrow
diff --git a/cpp/src/arrow/json/reader.h b/cpp/src/arrow/json/reader.h
new file mode 100644
index 0000000..6463793
--- /dev/null
+++ b/cpp/src/arrow/json/reader.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_READER_H
+#define ARROW_JSON_READER_H
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/json/options.h"  // IWYU pragma: keep
+#include "arrow/json/parser.h"   // IWYU pragma: keep
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class MemoryPool;
+class Table;
+
+namespace io {
+class InputStream;
+}  // namespace io
+
+namespace json {
+
+class ARROW_EXPORT TableReader {
+ public:
+  virtual ~TableReader() = default;
+
+  virtual Status Read(std::shared_ptr<Table>* out) = 0;
+
+  // XXX pass optional schema?
+  static Status Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
+                     const ReadOptions&, const ParseOptions&,
+                     std::shared_ptr<TableReader>* out);
+};
+
+ARROW_EXPORT Status ParseOne(ParseOptions options, std::shared_ptr<Buffer> json,
+                             std::shared_ptr<RecordBatch>* out);
+
+}  // namespace json
+}  // namespace arrow
+
+#endif  // ARROW_JSON_READER_H
diff --git a/cpp/src/arrow/json/test-common.h b/cpp/src/arrow/json/test-common.h
new file mode 100644
index 0000000..0e7f6e3
--- /dev/null
+++ b/cpp/src/arrow/json/test-common.h
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include <rapidjson/writer.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+namespace json {
+
+using rapidjson::StringBuffer;
+using Writer = rapidjson::Writer<StringBuffer>;
+
+inline static Status OK(bool ok) { return ok ? Status::OK() : Status::Invalid(""); }
+
+template <typename Engine>
+static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer* writer);
+
+template <typename Engine>
+static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine& e,
+                       Writer* writer);
+
+template <typename Engine>
+static Status Generate(const std::shared_ptr<Schema>& schm, Engine& e, Writer* writer) {
+  return Generate(schm->fields(), e, writer);
+}
+
+template <typename Engine>
+struct GenerateImpl {
+  Status Visit(const BooleanType&) {
+    return OK(writer.Bool(std::uniform_int_distribution<uint16_t>{}(e)&1));
+  }
+  template <typename T>
+  Status Visit(T const&, enable_if_unsigned_integer<T>* = nullptr) {
+    auto val = std::uniform_int_distribution<>{}(e);
+    return OK(writer.Uint64(static_cast<typename T::c_type>(val)));
+  }
+  template <typename T>
+  Status Visit(T const&, enable_if_signed_integer<T>* = nullptr) {
+    auto val = std::uniform_int_distribution<>{}(e);
+    return OK(writer.Int64(static_cast<typename T::c_type>(val)));
+  }
+  template <typename T>
+  Status Visit(T const&, enable_if_floating_point<T>* = nullptr) {
+    auto val = std::normal_distribution<typename T::c_type>{0, 1 << 10}(e);
+    return OK(writer.Double(val));
+  }
+  Status Visit(HalfFloatType const&) {
+    auto val = std::normal_distribution<double>{0, 1 << 10}(e);
+    return OK(writer.Double(val));
+  }
+  template <typename T>
+  Status Visit(T const&, enable_if_binary<T>* = nullptr) {
+    auto size = std::poisson_distribution<>{4}(e);
+    std::uniform_int_distribution<uint16_t> gen_char(32, 127);  // FIXME generate UTF8
+    std::string s(size, '\0');
+    for (char& ch : s) ch = static_cast<char>(gen_char(e));
+    return OK(writer.String(s.c_str()));
+  }
+  template <typename T>
+  Status Visit(
+      T const& t, typename std::enable_if<!is_number<T>::value>::type* = nullptr,
+      typename std::enable_if<!std::is_base_of<BinaryType, T>::value>::type* = nullptr) {
+    return Status::Invalid("can't generate a value of type " + t.name());
+  }
+  Status Visit(const ListType& t) {
+    auto size = std::poisson_distribution<>{4}(e);
+    writer.StartArray();
+    for (int i = 0; i != size; ++i) RETURN_NOT_OK(Generate(t.value_type(), e, &writer));
+    return OK(writer.EndArray(size));
+  }
+  Status Visit(const StructType& t) { return Generate(t.children(), e, &writer); }
+  Engine& e;
+  rapidjson::Writer<rapidjson::StringBuffer>& writer;
+};
+
+template <typename Engine>
+static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer* writer) {
+  if (std::uniform_real_distribution<>{0, 1}(e) < .2) {
+    // one out of 5 chance of null, anywhere
+    writer->Null();
+    return Status::OK();
+  }
+  GenerateImpl<Engine> visitor = {e, *writer};
+  return VisitTypeInline(*type, &visitor);
+}
+
+template <typename Engine>
+static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine& e,
+                       Writer* writer) {
+  RETURN_NOT_OK(OK(writer->StartObject()));
+  for (const auto& f : fields) {
+    writer->Key(f->name().c_str());
+    RETURN_NOT_OK(Generate(f->type(), e, writer));
+  }
+  return OK(writer->EndObject(static_cast<int>(fields.size())));
+}
+
+Status MakeBuffer(util::string_view data, std::shared_ptr<Buffer>* out) {
+  RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), data.size(), out));
+  std::copy(std::begin(data), std::end(data), (*out)->mutable_data());
+  return Status::OK();
+}
+
+// scalar values (numbers and strings) are parsed into a
+// dictionary<index:int32, value:string>. This can be decoded for ease of comparison
+Status DecodeStringDictionary(const DictionaryArray& dict_array,
+                              std::shared_ptr<Array>* decoded) {
+  const StringArray& dict = static_cast<const StringArray&>(*dict_array.dictionary());
+  const Int32Array& indices = static_cast<const Int32Array&>(*dict_array.indices());
+  StringBuilder builder;
+  RETURN_NOT_OK(builder.Resize(indices.length()));
+  for (int64_t i = 0; i != indices.length(); ++i) {
+    if (indices.IsNull(i)) {
+      builder.UnsafeAppendNull();
+      continue;
+    }
+    auto value = dict.GetView(indices.GetView(i));
+    RETURN_NOT_OK(builder.ReserveData(value.size()));
+    builder.UnsafeAppend(value);
+  }
+  return builder.Finish(decoded);
+}
+
+}  // namespace json
+}  // namespace arrow
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index ceb6885..abae413 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -92,6 +92,14 @@ class ARROW_EXPORT RecordBatch {
   /// \return an Array object
   virtual std::shared_ptr<Array> column(int i) const = 0;
 
+  /// \brief Retrieve an array from the record batch
+  /// \param[in] name field name
+  /// \return an Array or null if no field was found
+  std::shared_ptr<Array> GetColumnByName(const std::string& name) const {
+    auto i = schema_->GetFieldIndex(name);
+    return i == -1 ? NULLPTR : column(i);
+  }
+
   /// \brief Retrieve an array's internaldata from the record batch
   /// \param[in] i field index, does not boundscheck
   /// \return an internal ArrayData object
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 2ac34b4..31c9510 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -248,6 +248,14 @@ class ARROW_EXPORT Table {
   /// Return a column by index
   virtual std::shared_ptr<Column> column(int i) const = 0;
 
+  /// \brief Return a column by name
+  /// \param[in] name field name
+  /// \return an Array or null if no field was found
+  std::shared_ptr<Column> GetColumnByName(const std::string& name) const {
+    auto i = schema_->GetFieldIndex(name);
+    return i == -1 ? NULLPTR : column(i);
+  }
+
   /// \brief Remove column from the table, producing a new Table
   virtual Status RemoveColumn(int i, std::shared_ptr<Table>* out) const = 0;
 
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 2024899..852ddb0 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -381,11 +381,11 @@ bool Schema::Equals(const Schema& other, bool check_metadata) const {
 }
 
 std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) const {
-  int64_t i = GetFieldIndex(name);
+  int i = GetFieldIndex(name);
   return i == -1 ? nullptr : fields_[i];
 }
 
-int64_t Schema::GetFieldIndex(const std::string& name) const {
+int Schema::GetFieldIndex(const std::string& name) const {
   auto it = name_to_index_.find(name);
   if (it == name_to_index_.end()) {
     return -1;
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 4f61f2d..c775b11 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -827,7 +827,7 @@ class ARROW_EXPORT Schema {
   std::shared_ptr<Field> GetFieldByName(const std::string& name) const;
 
   /// Returns -1 if name not found
-  int64_t GetFieldIndex(const std::string& name) const;
+  int GetFieldIndex(const std::string& name) const;
 
   const std::vector<std::shared_ptr<Field>>& fields() const { return fields_; }
 
diff --git a/cpp/src/arrow/util/stl.h b/cpp/src/arrow/util/stl.h
index 163ed40..4889814 100644
--- a/cpp/src/arrow/util/stl.h
+++ b/cpp/src/arrow/util/stl.h
@@ -18,6 +18,9 @@
 #ifndef ARROW_UTIL_STL_H
 #define ARROW_UTIL_STL_H
 
+#include <memory>
+#include <type_traits>
+#include <utility>
 #include <vector>
 
 #include "arrow/util/logging.h"
@@ -25,6 +28,20 @@
 namespace arrow {
 namespace internal {
 
+template <typename T, typename... A>
+typename std::enable_if<!std::is_array<T>::value, std::unique_ptr<T>>::type make_unique(
+    A&&... args) {
+  return std::unique_ptr<T>(new T(std::forward<A>(args)...));
+}
+
+template <typename T>
+typename std::enable_if<std::is_array<T>::value && std::extent<T>::value == 0,
+                        std::unique_ptr<T>>::type
+make_unique(std::size_t n) {
+  using value_type = typename std::remove_extent<T>::type;
+  return std::unique_ptr<value_type[]>(new value_type[n]);
+}
+
 template <typename T>
 inline std::vector<T> DeleteVectorElement(const std::vector<T>& values, size_t index) {
   DCHECK(!values.empty());