You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/02/20 14:54:07 UTC
[arrow] branch master updated: ARROW-694: [C++] Initial parser
interface for reading JSON into RecordBatches
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9c19bb6 ARROW-694: [C++] Initial parser interface for reading JSON into RecordBatches
9c19bb6 is described below
commit 9c19bb65c128f9e9a82e19b0c0caf575817e8fb2
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Wed Feb 20 08:53:57 2019 -0600
ARROW-694: [C++] Initial parser interface for reading JSON into RecordBatches
( abandoning https://github.com/apache/arrow/pull/3206 )
Adds [`json` sub project](https://github.com/apache/arrow/pull/3592/files#diff-2443c7d7b39b992ea580f0fbd387284a) with:
- BlockParser which parses Buffers of json formatted data into a StructArray with minimal conversion
* true/false, and null fields are stored in BooleanArray and NullArray respectively
* strings are stored as indices into a single StringArray
* numbers are not converted; their string representations are stored alongside string values
* nested fields are stored as ListArray or StructArray of their parsed (unconverted) children
- Three approaches to handling unexpected fields:
1. Error on an unexpected field
2. Ignore unexpected fields
3. Infer the type of unexpected fields and add them to the schema
- [Convenience interface](https://github.com/apache/arrow/pull/3592/files#diff-d043a0249cc485b08d93767d2075bd83R124) for parsing a single chunk of json data into a RecordBatch with fully converted columns
- Chunker to process a stream of unchunked data for use by BlockParser (not currently used)
Author: Benjamin Kietzman <be...@gmail.com>
Author: Wes McKinney <we...@apache.org>
Closes #3592 from bkietz/ARROW-694-json-reader-WIP and squashes the following commits:
e42e5d730 <Wes McKinney> Add arrow_dependencies to arrow_flight.so dependencies to fix race condition with Flatbuffers
d67aff63e <Benjamin Kietzman> adding more comments to parser.cc
554b595d9 <Benjamin Kietzman> adding explanatory comments to json/parser.cc
0d0caa991 <Benjamin Kietzman> Add ARROW_PREDICT_* to conditions in parser.cc
d7d0a2eb6 <Benjamin Kietzman> fix doc error in chunker.h
29c23128a <Wes McKinney> Disable arrow-json-chunker-test
76d0431e2 <Wes McKinney> cmake-format
83150ec13 <Wes McKinney> Restore BinaryBuilder::UnsafeAppend for const std::string&
05fd9d189 <Benjamin Kietzman> add json project back (merge error)
69541ea87 <Benjamin Kietzman> correct test util includes
be720c812 <Benjamin Kietzman> Use compound statements in if()
baac46735 <Benjamin Kietzman> use const shared_ptr<T>& instead of move
6e86078f1 <Benjamin Kietzman> Move ParseOne to reader.cc
1bebda861 <Benjamin Kietzman> clean up Chunker's stream usage
0d6d92026 <Benjamin Kietzman> disabling chunker test
c1a7f4bd3 <Benjamin Kietzman> check status for generating list elements
292672aee <Benjamin Kietzman> add inline tag
f92f8508c <Benjamin Kietzman> add Status return to Generate
e3346485e <Benjamin Kietzman> remove misplaced const
9679b6f76 <Benjamin Kietzman> fix format issue, use SFINAE to detect StringConverter default constructibility
aaaf9e7e8 <Benjamin Kietzman> remove bitfields
10e4f0dc4 <Benjamin Kietzman> add missing virtual destructor
92ffc640d <Benjamin Kietzman> adding ParseOne interface for dead simple parsing
330615b95 <Benjamin Kietzman> adding first draft of parsing with type inference
69d7c5c00 <Benjamin Kietzman> Rewrite parser to defer conversion of strings and numbers
b9d5c3d2d <Benjamin Kietzman> adding Chunker implementation and tests
2677a575d <Benjamin Kietzman> use recommended loop style
e39a4a9e0 <Benjamin Kietzman> add (failing) test for '-0' consistency
0f3a3bc0f <Benjamin Kietzman> Added trivial parser benchmark and data generator
6472738b3 <Benjamin Kietzman> Refactored type inferrence
cb6a313d7 <Benjamin Kietzman> adding first draft of type inferrence to BlockParser
cc3698a44 <Benjamin Kietzman> refactoring Schema::GetFieldIndex to return int
17176f975 <Benjamin Kietzman> first sketch of JSON parser
---
cpp/src/arrow/CMakeLists.txt | 5 +
cpp/src/arrow/array.cc | 6 +-
cpp/src/arrow/array.h | 2 +
cpp/src/arrow/array/builder_binary.h | 4 +
cpp/src/arrow/flight/CMakeLists.txt | 1 +
cpp/src/arrow/json/CMakeLists.txt | 25 +
cpp/src/arrow/json/api.h | 24 +
cpp/src/arrow/json/chunker-test.cc | 207 +++++++
cpp/src/arrow/json/chunker.cc | 212 +++++++
cpp/src/arrow/json/chunker.h | 64 ++
cpp/src/arrow/json/options.cc | 28 +
cpp/src/arrow/json/options.h | 68 +++
cpp/src/arrow/json/parser-benchmark.cc | 70 +++
cpp/src/arrow/json/parser-test.cc | 249 ++++++++
cpp/src/arrow/json/parser.cc | 1034 ++++++++++++++++++++++++++++++++
cpp/src/arrow/json/parser.h | 128 ++++
cpp/src/arrow/json/reader.cc | 298 +++++++++
cpp/src/arrow/json/reader.h | 60 ++
cpp/src/arrow/json/test-common.h | 149 +++++
cpp/src/arrow/record_batch.h | 8 +
cpp/src/arrow/table.h | 8 +
cpp/src/arrow/type.cc | 4 +-
cpp/src/arrow/type.h | 2 +-
cpp/src/arrow/util/stl.h | 17 +
24 files changed, 2669 insertions(+), 4 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 07c404a..39c342b 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -104,6 +104,10 @@ set(ARROW_SRCS
csv/options.cc
csv/parser.cc
csv/reader.cc
+ json/options.cc
+ json/chunker.cc
+ json/parser.cc
+ json/reader.cc
io/buffered.cc
io/compressed.cc
io/file.cc
@@ -321,6 +325,7 @@ add_arrow_benchmark(column-benchmark)
add_subdirectory(array)
add_subdirectory(csv)
+add_subdirectory(json)
add_subdirectory(io)
add_subdirectory(util)
add_subdirectory(vendored)
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 8ceb6ea..563ac25 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -266,8 +266,12 @@ void ListArray::SetData(const std::shared_ptr<ArrayData>& data) {
values_ = MakeArray(data_->child_data[0]);
}
+const ListType* ListArray::list_type() const {
+ return checked_cast<const ListType*>(data_->type.get());
+}
+
std::shared_ptr<DataType> ListArray::value_type() const {
- return checked_cast<const ListType&>(*type()).value_type();
+ return list_type()->value_type();
}
std::shared_ptr<Array> ListArray::values() const { return values_; }
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index f8d451a..35c7bd8 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -486,6 +486,8 @@ class ARROW_EXPORT ListArray : public Array {
static Status FromArrays(const Array& offsets, const Array& values, MemoryPool* pool,
std::shared_ptr<Array>* out);
+ const ListType* list_type() const;
+
/// \brief Return array object containing the list's values
std::shared_ptr<Array> values() const;
diff --git a/cpp/src/arrow/array/builder_binary.h b/cpp/src/arrow/array/builder_binary.h
index 67d579d..3bc930c 100644
--- a/cpp/src/arrow/array/builder_binary.h
+++ b/cpp/src/arrow/array/builder_binary.h
@@ -88,6 +88,10 @@ class ARROW_EXPORT BinaryBuilder : public ArrayBuilder {
UnsafeAppend(value.c_str(), static_cast<int32_t>(value.size()));
}
+ void UnsafeAppend(util::string_view value) {
+ UnsafeAppend(value.data(), static_cast<int32_t>(value.size()));
+ }
+
void UnsafeAppendNull() {
const int64_t num_bytes = value_data_builder_.length();
offsets_builder_.UnsafeAppend(static_cast<int32_t>(num_bytes));
diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt
index f02bc21..6f44c01 100644
--- a/cpp/src/arrow/flight/CMakeLists.txt
+++ b/cpp/src/arrow/flight/CMakeLists.txt
@@ -92,6 +92,7 @@ if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
DEPENDENCIES
${GTEST_LIBRARY}
flight_grpc_gen
+ arrow_dependencies
SHARED_LINK_LIBS
arrow_shared
arrow_flight_shared
diff --git a/cpp/src/arrow/json/CMakeLists.txt b/cpp/src/arrow/json/CMakeLists.txt
new file mode 100644
index 0000000..a244b8c
--- /dev/null
+++ b/cpp/src/arrow/json/CMakeLists.txt
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_arrow_test(parser-test PREFIX "arrow-json")
+
+# TODO(wesm): ARROW-694 this fails valgrind
+# add_arrow_test(chunker-test PREFIX "arrow-json")
+
+add_arrow_benchmark(parser-benchmark PREFIX "arrow-json")
+
+arrow_install_all_headers("arrow/json")
diff --git a/cpp/src/arrow/json/api.h b/cpp/src/arrow/json/api.h
new file mode 100644
index 0000000..00fbc2e
--- /dev/null
+++ b/cpp/src/arrow/json/api.h
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_API_H
+#define ARROW_JSON_API_H
+
+#include "arrow/json/options.h"
+#include "arrow/json/reader.h"
+
+#endif // ARROW_JSON_API_H
diff --git a/cpp/src/arrow/json/chunker-test.cc b/cpp/src/arrow/json/chunker-test.cc
new file mode 100644
index 0000000..36e2953
--- /dev/null
+++ b/cpp/src/arrow/json/chunker-test.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <numeric>
+#include <string>
+
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/reader.h>
+
+#include "arrow/json/chunker.h"
+#include "arrow/json/options.h"
+#include "arrow/testing/gtest_common.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+namespace json {
+
+// Use no nested objects and no string literals containing braces in this test.
+// This way the positions of '{' and '}' can be used as simple proxies
+// for object begin/end.
+
+using util::string_view;
+
+template <typename Lines>
+std::string join(Lines&& lines, std::string delimiter) {
+ std::string joined;
+ for (const auto& line : lines) {
+ joined += line + delimiter;
+ }
+ return joined;
+}
+
+std::string PrettyPrint(std::string one_line) {
+ rapidjson::Document document;
+ document.ParseInsitu(const_cast<char*>(one_line.data()));
+ rapidjson::StringBuffer sb;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb);
+ document.Accept(writer);
+ return sb.GetString();
+}
+
+bool WhitespaceOnly(string_view s) {
+ return s.find_first_not_of(" \t\r\n") == string_view::npos;
+}
+
+std::size_t ConsumeWholeObject(string_view* str) {
+ auto fail = [str] {
+ *str = string_view();
+ return string_view::npos;
+ };
+ if (WhitespaceOnly(*str)) return fail();
+ auto open_brace = str->find_first_not_of(" \t\r\n");
+ if (str->at(open_brace) != '{') return fail();
+ auto close_brace = str->find_first_of("}");
+ if (close_brace == string_view::npos) return fail();
+ if (str->at(close_brace) != '}') return fail();
+ auto length = close_brace + 1;
+ *str = str->substr(length);
+ return length;
+}
+
+void AssertWholeObjects(Chunker& chunker, string_view block, int expected_count) {
+ string_view whole;
+ ASSERT_OK(chunker.Process(block, &whole));
+ int count = 0;
+ while (!WhitespaceOnly(whole)) {
+ if (ConsumeWholeObject(&whole) == string_view::npos) FAIL();
+ ++count;
+ }
+ ASSERT_EQ(count, expected_count);
+}
+
+void AssertChunking(Chunker& chunker, std::string str, int total_count) {
+ // First chunkize whole JSON block
+ AssertWholeObjects(chunker, str, total_count);
+
+ // Then chunkize incomplete substrings of the block
+ for (int i = 0; i != total_count; ++i) {
+ // ensure shearing the closing brace off the last object causes it to be chunked out
+ string_view str_view(str);
+ auto last_brace = str_view.find_last_of('}');
+ AssertWholeObjects(chunker, str.substr(0, last_brace), total_count - i - 1);
+
+ // ensure skipping one object reduces the count by one
+ ASSERT_NE(ConsumeWholeObject(&str_view), string_view::npos);
+ str = str_view.to_string();
+ AssertWholeObjects(chunker, str, total_count - i - 1);
+ }
+}
+
+void AssertStraddledChunking(Chunker& chunker, string_view str) {
+ auto first_half = str.substr(0, str.size() / 2).to_string();
+ auto second_half = str.substr(str.size() / 2);
+ AssertChunking(chunker, first_half, 1);
+ string_view first_whole;
+ ASSERT_OK(chunker.Process(first_half, &first_whole));
+ ASSERT_TRUE(string_view(first_half).starts_with(first_whole));
+ auto partial = string_view(first_half).substr(first_whole.size());
+ string_view completion;
+ ASSERT_OK(chunker.Process(partial, second_half, &completion));
+ ASSERT_TRUE(second_half.starts_with(completion));
+ auto straddling = partial.to_string() + completion.to_string();
+ string_view straddling_view(straddling);
+ auto length = ConsumeWholeObject(&straddling_view);
+ ASSERT_NE(length, string_view::npos);
+ ASSERT_NE(length, 0);
+ auto final_whole = second_half.substr(completion.size());
+ length = ConsumeWholeObject(&final_whole);
+ ASSERT_NE(length, string_view::npos);
+ ASSERT_NE(length, 0);
+}
+
+std::unique_ptr<Chunker> MakeChunker(bool newlines_in_values) {
+ auto options = ParseOptions::Defaults();
+ options.newlines_in_values = newlines_in_values;
+ return Chunker::Make(options);
+}
+
+class BaseChunkerTest : public ::testing::TestWithParam<bool> {
+ protected:
+ void SetUp() override { chunker_ = MakeChunker(GetParam()); }
+
+ std::unique_ptr<Chunker> chunker_;
+};
+
+INSTANTIATE_TEST_CASE_P(ChunkerTest, BaseChunkerTest, ::testing::Values(true));
+
+INSTANTIATE_TEST_CASE_P(NoNewlineChunkerTest, BaseChunkerTest, ::testing::Values(false));
+
+constexpr auto object_count = 3;
+const std::vector<std::string>& lines() {
+ static const std::vector<std::string> l = {R"({"0":"ab","1":"c","2":""})",
+ R"({"0":"def","1":"","2":"gh"})",
+ R"({"0":"","1":"ij","2":"kl"})"};
+ return l;
+}
+
+TEST_P(BaseChunkerTest, Basics) {
+ AssertChunking(*chunker_, join(lines(), "\n"), object_count);
+}
+
+TEST_P(BaseChunkerTest, Empty) {
+ AssertChunking(*chunker_, "\n", 0);
+ AssertChunking(*chunker_, "\n\n", 0);
+}
+
+TEST(ChunkerTest, PrettyPrinted) {
+ std::string pretty[object_count];
+ std::transform(std::begin(lines()), std::end(lines()), std::begin(pretty), PrettyPrint);
+ auto chunker = MakeChunker(true);
+ AssertChunking(*chunker, join(pretty, "\n"), object_count);
+}
+
+TEST(ChunkerTest, SingleLine) {
+ auto chunker = MakeChunker(true);
+ AssertChunking(*chunker, join(lines(), ""), object_count);
+}
+
+TEST_P(BaseChunkerTest, Straddling) {
+ AssertStraddledChunking(*chunker_, join(lines(), "\n"));
+}
+
+TEST(ChunkerTest, StraddlingPrettyPrinted) {
+ std::string pretty[object_count];
+ std::transform(std::begin(lines()), std::end(lines()), std::begin(pretty), PrettyPrint);
+ auto chunker = MakeChunker(true);
+ AssertStraddledChunking(*chunker, join(pretty, "\n"));
+}
+
+TEST(ChunkerTest, StraddlingSingleLine) {
+ auto chunker = MakeChunker(true);
+ AssertStraddledChunking(*chunker, join(lines(), ""));
+}
+
+TEST_P(BaseChunkerTest, StraddlingEmpty) {
+ auto joined = join(lines(), "\n");
+ auto first = string_view(joined).substr(0, lines()[0].size() + 1);
+ auto rest = string_view(joined).substr(first.size());
+ string_view first_whole;
+ ASSERT_OK(chunker_->Process(first, &first_whole));
+ auto partial = first.substr(first_whole.size());
+ string_view completion;
+ ASSERT_OK(chunker_->Process(partial, rest, &completion));
+ ASSERT_EQ(completion.size(), 0);
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/cpp/src/arrow/json/chunker.cc b/cpp/src/arrow/json/chunker.cc
new file mode 100644
index 0000000..02daa06
--- /dev/null
+++ b/cpp/src/arrow/json/chunker.cc
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/chunker.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#if defined(ARROW_HAVE_SSE4_2)
+#define RAPIDJSON_SSE42 1
+#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
+#endif
+#if defined(ARROW_HAVE_SSE2)
+#define RAPIDJSON_SSE2 1
+#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
+#endif
+#include <rapidjson/error/en.h>
+#include <rapidjson/reader.h>
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/stl.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+namespace json {
+
+using internal::make_unique;
+using util::string_view;
+
+Status StraddlingTooLarge() {
+ return Status::Invalid("straddling object straddles two block boundaries");
+}
+
+std::size_t ConsumeWhitespace(string_view* str) {
+#if defined(ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD)
+ auto nonws_begin =
+ rapidjson::SkipWhitespace_SIMD(str->data(), str->data() + str->size());
+ auto ws_count = nonws_begin - str->data();
+ *str = str->substr(ws_count);
+ return static_cast<std::size_t>(ws_count);
+#undef ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD
+#else
+ auto ws_count = str->find_first_not_of(" \t\r\n");
+ *str = str->substr(ws_count);
+ return ws_count;
+#endif
+}
+
+class NewlinesStrictlyDelimitChunker : public Chunker {
+ public:
+ Status Process(string_view block, string_view* chunked) override {
+ auto last_newline = block.find_last_of("\n\r");
+ if (last_newline == string_view::npos) {
+ // no newlines in this block, return empty chunk
+ *chunked = string_view();
+ } else {
+ *chunked = block.substr(0, last_newline + 1);
+ }
+ return Status::OK();
+ }
+
+ Status Process(string_view partial, string_view block,
+ string_view* completion) override {
+ ConsumeWhitespace(&partial);
+ if (partial.size() == 0) {
+ // if partial is empty, don't bother looking for completion
+ *completion = string_view();
+ return Status::OK();
+ }
+ auto first_newline = block.find_first_of("\n\r");
+ if (first_newline == string_view::npos) {
+ // no newlines in this block; straddling object straddles *two* block boundaries.
+ // retry with larger buffer
+ return StraddlingTooLarge();
+ }
+ *completion = block.substr(0, first_newline + 1);
+ return Status::OK();
+ }
+};
+
+/// RapidJson custom stream for reading JSON stored in multiple buffers
+/// http://rapidjson.org/md_doc_stream.html#CustomStream
+class MultiStringStream {
+ public:
+ using Ch = char;
+ explicit MultiStringStream(std::vector<string_view> strings)
+ : strings_(std::move(strings)) {
+ std::remove(strings_.begin(), strings_.end(), string_view(""));
+ std::reverse(strings_.begin(), strings_.end());
+ }
+ char Peek() const {
+ if (strings_.size() == 0) return '\0';
+ return strings_.back()[0];
+ }
+ char Take() {
+ if (strings_.size() == 0) return '\0';
+ char taken = strings_.back()[0];
+ if (strings_.back().size() == 1) {
+ strings_.pop_back();
+ } else {
+ strings_.back() = strings_.back().substr(1);
+ }
+ ++index_;
+ return taken;
+ }
+ std::size_t Tell() { return index_; }
+ void Put(char) { ARROW_LOG(FATAL) << "not implemented"; }
+ void Flush() { ARROW_LOG(FATAL) << "not implemented"; }
+ char* PutBegin() {
+ ARROW_LOG(FATAL) << "not implemented";
+ return nullptr;
+ }
+ std::size_t PutEnd(char*) {
+ ARROW_LOG(FATAL) << "not implemented";
+ return 0;
+ }
+
+ private:
+ std::size_t index_ = 0;
+ std::vector<string_view> strings_;
+};
+
+template <typename Stream>
+std::size_t ConsumeWholeObject(Stream&& stream) {
+ static constexpr unsigned parse_flags = rapidjson::kParseIterativeFlag |
+ rapidjson::kParseStopWhenDoneFlag |
+ rapidjson::kParseNumbersAsStringsFlag;
+ rapidjson::BaseReaderHandler<rapidjson::UTF8<>> handler;
+ rapidjson::Reader reader;
+ // parse a single JSON object
+ switch (reader.Parse<parse_flags>(stream, handler).Code()) {
+ case rapidjson::kParseErrorNone:
+ return stream.Tell();
+ case rapidjson::kParseErrorDocumentEmpty:
+ return 0;
+ default:
+ // rapidjson emitted an error, the most recent object was partial
+ return string_view::npos;
+ }
+}
+
+class ParsingChunker : public Chunker {
+ public:
+ Status Process(string_view block, string_view* chunked) override {
+ if (block.size() == 0) {
+ *chunked = string_view();
+ return Status::OK();
+ }
+ std::size_t total_length = 0;
+ for (auto consumed = block;; consumed = block.substr(total_length)) {
+ auto length = ConsumeWholeObject(rapidjson::StringStream(consumed.data()));
+ if (length == string_view::npos || length == 0) {
+ // found incomplete object or consumed is empty
+ break;
+ }
+ if (length > consumed.size()) {
+ total_length += consumed.size();
+ break;
+ }
+ total_length += length;
+ }
+ *chunked = block.substr(0, total_length);
+ return Status::OK();
+ }
+
+ Status Process(string_view partial, string_view block,
+ string_view* completion) override {
+ ConsumeWhitespace(&partial);
+ if (partial.size() == 0) {
+ // if partial is empty, don't bother looking for completion
+ *completion = string_view();
+ return Status::OK();
+ }
+ auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
+ if (length == string_view::npos) {
+ // straddling object straddles *two* block boundaries.
+ // retry with larger buffer
+ return StraddlingTooLarge();
+ }
+ *completion = block.substr(0, length - partial.size());
+ return Status::OK();
+ }
+};
+
+std::unique_ptr<Chunker> Chunker::Make(ParseOptions options) {
+ if (!options.newlines_in_values) {
+ return make_unique<NewlinesStrictlyDelimitChunker>();
+ }
+ return make_unique<ParsingChunker>();
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/cpp/src/arrow/json/chunker.h b/cpp/src/arrow/json/chunker.h
new file mode 100644
index 0000000..1e71dd4
--- /dev/null
+++ b/cpp/src/arrow/json/chunker.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_CHUNKER_H
+#define ARROW_JSON_CHUNKER_H
+
+#include <memory>
+
+#include "arrow/json/options.h"
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/sse-util.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace json {
+
+/// \class Chunker
+/// \brief A reusable block-based chunker for JSON data
+///
+/// The chunker takes a block of JSON data and finds a suitable place
+/// to cut it up without splitting an object.
+class ARROW_EXPORT Chunker {
+ public:
+ virtual ~Chunker() = default;
+
+ /// \brief Carve up a chunk in a block of data to contain only whole objects
+ /// \param[in] block json data to be chunked, must end with '\0'
+ /// \param[out] chunked subrange of block containing whole json objects
+ virtual Status Process(util::string_view block, util::string_view* chunked) = 0;
+
+ /// \brief Carve the completion of a partial object out of a block
+ /// \param[in] partial incomplete json object
+ /// \param[in] block json data
+ /// \param[out] completion subrange of block contining the completion of partial
+ virtual Status Process(util::string_view partial, util::string_view block,
+ util::string_view* completion) = 0;
+
+ static std::unique_ptr<Chunker> Make(ParseOptions options);
+
+ protected:
+ Chunker() = default;
+ ARROW_DISALLOW_COPY_AND_ASSIGN(Chunker);
+};
+
+} // namespace json
+} // namespace arrow
+
+#endif // ARROW_JSON_CHUNKER_H
diff --git a/cpp/src/arrow/json/options.cc b/cpp/src/arrow/json/options.cc
new file mode 100644
index 0000000..dc5e628
--- /dev/null
+++ b/cpp/src/arrow/json/options.cc
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/options.h"
+
+namespace arrow {
+namespace json {
+
+ParseOptions ParseOptions::Defaults() { return ParseOptions(); }
+
+ReadOptions ReadOptions::Defaults() { return ReadOptions(); }
+
+} // namespace json
+} // namespace arrow
diff --git a/cpp/src/arrow/json/options.h b/cpp/src/arrow/json/options.h
new file mode 100644
index 0000000..0ec1b29
--- /dev/null
+++ b/cpp/src/arrow/json/options.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_OPTIONS_H
+#define ARROW_JSON_OPTIONS_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "arrow/type.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class DataType;
+
+namespace json {
+
+enum class UnexpectedFieldBehavior : char { Ignore, Error, InferType };
+
+struct ARROW_EXPORT ParseOptions {
+ // Parsing options
+
+ // Optional explicit schema (no type inference, ignores other fields)
+ std::shared_ptr<Schema> explicit_schema;
+
+ // Whether objects may be printed across multiple lines (for example pretty printed)
+ // NB: if false, input must end with an empty line
+ bool newlines_in_values = false;
+
+ // How should parse handle fields outside the explicit_schema?
+ UnexpectedFieldBehavior unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+
+ static ParseOptions Defaults();
+};
+
+struct ARROW_EXPORT ReadOptions {
+ // Reader options
+
+ // Whether to use the global CPU thread pool
+ bool use_threads = true;
+ // Block size we request from the IO layer; also determines the size of
+ // chunks when use_threads is true
+ int32_t block_size = 1 << 20; // 1 MB
+
+ static ReadOptions Defaults();
+};
+
+} // namespace json
+} // namespace arrow
+
+#endif // ARROW_JSON_OPTIONS_H
diff --git a/cpp/src/arrow/json/parser-benchmark.cc b/cpp/src/arrow/json/parser-benchmark.cc
new file mode 100644
index 0000000..82f0b33
--- /dev/null
+++ b/cpp/src/arrow/json/parser-benchmark.cc
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <iostream>
+#include <string>
+
+#include "arrow/json/options.h"
+#include "arrow/json/parser.h"
+#include "arrow/json/test-common.h"
+#include "arrow/testing/gtest_util.h"
+
+namespace arrow {
+namespace json {
+
+static void BenchmarkJSONParsing(benchmark::State& state, // NOLINT non-const reference
+ const std::string& json, int32_t num_rows,
+ ParseOptions options) {
+ for (auto _ : state) {
+ std::shared_ptr<Buffer> src;
+ ABORT_NOT_OK(MakeBuffer(json, &src));
+ BlockParser parser(options, src);
+ ABORT_NOT_OK(parser.Parse(src));
+ if (parser.num_rows() != num_rows) {
+ std::cerr << "Parsing incomplete\n";
+ std::abort();
+ }
+ std::shared_ptr<Array> parsed;
+ ABORT_NOT_OK(parser.Finish(&parsed));
+ }
+ state.SetBytesProcessed(state.iterations() * json.size());
+}
+
+static void BM_ParseJSONBlockWithSchema(
+ benchmark::State& state) { // NOLINT non-const reference
+ const int32_t num_rows = 5000;
+ auto options = ParseOptions::Defaults();
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+ options.explicit_schema = schema({field("int", int32()), field("str", utf8())});
+ std::mt19937_64 engine;
+ std::string json;
+ for (int i = 0; i != num_rows; ++i) {
+ StringBuffer sb;
+ Writer writer(sb);
+ ABORT_NOT_OK(Generate(options.explicit_schema, engine, &writer));
+ json += sb.GetString();
+ json += "\n";
+ }
+ BenchmarkJSONParsing(state, json, num_rows, options);
+}
+
+BENCHMARK(BM_ParseJSONBlockWithSchema)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
+} // namespace json
+} // namespace arrow
diff --git a/cpp/src/arrow/json/parser-test.cc b/cpp/src/arrow/json/parser-test.cc
new file mode 100644
index 0000000..c3af4a7
--- /dev/null
+++ b/cpp/src/arrow/json/parser-test.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <iomanip>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <rapidjson/error/en.h>
+#include <rapidjson/reader.h>
+
+#include "arrow/ipc/json-simple.h"
+#include "arrow/json/options.h"
+#include "arrow/json/parser.h"
+#include "arrow/json/reader.h"
+#include "arrow/json/test-common.h"
+#include "arrow/status.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string_view.h"
+
+namespace arrow {
+
+using util::string_view;
+
+namespace json {
+
+std::string scalars_only_src() {
+ return R"(
+ { "hello": 3.5, "world": false, "yo": "thing" }
+ { "hello": 3.2, "world": null }
+ { "hello": 3.4, "world": null, "yo": "\u5fcd" }
+ { "hello": 0.0, "world": true, "yo": null }
+ )";
+}
+
+std::string nested_src() {
+ return R"(
+ { "hello": 3.5, "world": false, "yo": "thing", "arr": [1, 2, 3], "nuf": {} }
+ { "hello": 3.2, "world": null, "arr": [2], "nuf": null }
+ { "hello": 3.4, "world": null, "yo": "\u5fcd", "arr": [], "nuf": { "ps": 78 } }
+ { "hello": 0.0, "world": true, "yo": null, "arr": null, "nuf": { "ps": 90 } }
+ )";
+}
+
+void AssertRawStructArraysEqual(const StructArray& expected, const StructArray& actual);
+
+void AssertRawArraysEqual(const Array& expected, const Array& actual) {
+ switch (actual.type_id()) {
+ case Type::BOOL:
+ case Type::NA:
+ return AssertArraysEqual(expected, actual);
+ case Type::DICTIONARY: {
+ ASSERT_EQ(expected.type_id(), Type::STRING);
+ std::shared_ptr<Array> actual_decoded;
+ ASSERT_OK(DecodeStringDictionary(static_cast<const DictionaryArray&>(actual),
+ &actual_decoded));
+ return AssertArraysEqual(expected, *actual_decoded);
+ }
+ case Type::LIST: {
+ ASSERT_EQ(expected.type_id(), Type::LIST);
+ AssertBufferEqual(*expected.null_bitmap(), *actual.null_bitmap());
+ const auto& expected_offsets = expected.data()->buffers[1];
+ const auto& actual_offsets = actual.data()->buffers[1];
+ AssertBufferEqual(*expected_offsets, *actual_offsets);
+ auto expected_values = static_cast<const ListArray&>(expected).values();
+ auto actual_values = static_cast<const ListArray&>(actual).values();
+ return AssertRawArraysEqual(*expected_values, *actual_values);
+ }
+ case Type::STRUCT:
+ ASSERT_EQ(expected.type_id(), Type::STRUCT);
+ return AssertRawStructArraysEqual(static_cast<const StructArray&>(expected),
+ static_cast<const StructArray&>(actual));
+ default:
+ FAIL();
+ }
+}
+
+void AssertRawStructArraysEqual(const StructArray& expected, const StructArray& actual) {
+ ASSERT_EQ(expected.num_fields(), actual.num_fields());
+ for (int i = 0; i != expected.num_fields(); ++i) {
+ auto expected_name = expected.type()->child(i)->name();
+ auto actual_name = actual.type()->child(i)->name();
+ ASSERT_EQ(expected_name, actual_name);
+ AssertRawArraysEqual(*expected.field(i), *actual.field(i));
+ }
+}
+
+void AssertParseColumns(ParseOptions options, string_view src_str,
+ const std::vector<std::shared_ptr<Field>>& fields,
+ const std::vector<std::string>& columns_json) {
+ std::shared_ptr<Buffer> src;
+ ASSERT_OK(MakeBuffer(src_str, &src));
+ BlockParser parser(options, src);
+ ASSERT_OK(parser.Parse(src));
+ std::shared_ptr<Array> parsed;
+ ASSERT_OK(parser.Finish(&parsed));
+ auto struct_array = std::static_pointer_cast<StructArray>(parsed);
+ for (size_t i = 0; i != fields.size(); ++i) {
+ // std::shared_ptr<Array> column_expected;
+ // ASSERT_OK(ArrayFromJSON(fields[i]->type(), columns_json[i], &column_expected));
+ auto column_expected = ArrayFromJSON(fields[i]->type(), columns_json[i]);
+ auto column = struct_array->GetFieldByName(fields[i]->name());
+ AssertRawArraysEqual(*column_expected, *column);
+ }
+}
+
+// TODO(bkietz) parameterize (at least some of) these tests over UnexpectedFieldBehavior
+
+TEST(BlockParserWithSchema, Basics) {
+ auto options = ParseOptions::Defaults();
+ options.explicit_schema =
+ schema({field("hello", float64()), field("world", boolean()), field("yo", utf8())});
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+ AssertParseColumns(
+ options, scalars_only_src(),
+ {field("hello", utf8()), field("world", boolean()), field("yo", utf8())},
+ {"[\"3.5\", \"3.2\", \"3.4\", \"0.0\"]", "[false, null, null, true]",
+ "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
+}
+
+TEST(BlockParserWithSchema, Empty) {
+ auto options = ParseOptions::Defaults();
+ options.explicit_schema =
+ schema({field("hello", float64()), field("world", boolean()), field("yo", utf8())});
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+ AssertParseColumns(
+ options, "",
+ {field("hello", utf8()), field("world", boolean()), field("yo", utf8())},
+ {"[]", "[]", "[]"});
+}
+
+TEST(BlockParserWithSchema, SkipFieldsOutsideSchema) {
+ auto options = ParseOptions::Defaults();
+ options.explicit_schema = schema({field("hello", float64()), field("yo", utf8())});
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+ AssertParseColumns(options, scalars_only_src(),
+ {field("hello", utf8()), field("yo", utf8())},
+ {"[\"3.5\", \"3.2\", \"3.4\", \"0.0\"]",
+ "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
+}
+
+TEST(BlockParserWithSchema, FailOnInconvertible) {
+ auto options = ParseOptions::Defaults();
+ options.explicit_schema = schema({field("a", int32())});
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+ std::shared_ptr<Buffer> src;
+ ASSERT_OK(MakeBuffer("{\"a\":0}\n{\"a\":true}", &src));
+ BlockParser parser(options, src);
+ ASSERT_RAISES(Invalid, parser.Parse(src));
+}
+
+TEST(BlockParserWithSchema, Nested) {
+ auto options = ParseOptions::Defaults();
+ options.explicit_schema = schema({field("yo", utf8()), field("arr", list(int32())),
+ field("nuf", struct_({field("ps", int32())}))});
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+ AssertParseColumns(options, nested_src(),
+ {field("yo", utf8()), field("arr", list(utf8())),
+ field("nuf", struct_({field("ps", utf8())}))},
+ {"[\"thing\", null, \"\xe5\xbf\x8d\", null]",
+ R"([["1", "2", "3"], ["2"], [], null])",
+ R"([{"ps":null}, null, {"ps":"78"}, {"ps":"90"}])"});
+}
+
+TEST(BlockParserWithSchema, FailOnIncompleteJson) {
+ auto options = ParseOptions::Defaults();
+ options.explicit_schema = schema({field("a", int32())});
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+ std::shared_ptr<Buffer> src;
+ ASSERT_OK(MakeBuffer("{\"a\":0, \"b\"", &src));
+ BlockParser parser(options, src);
+ ASSERT_RAISES(Invalid, parser.Parse(src));
+}
+
+TEST(BlockParser, Basics) {
+ auto options = ParseOptions::Defaults();
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+ AssertParseColumns(
+ options, scalars_only_src(),
+ {field("hello", utf8()), field("world", boolean()), field("yo", utf8())},
+ {"[\"3.5\", \"3.2\", \"3.4\", \"0.0\"]", "[false, null, null, true]",
+ "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
+}
+
+TEST(BlockParser, Nested) {
+ auto options = ParseOptions::Defaults();
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+ AssertParseColumns(options, nested_src(),
+ {field("yo", utf8()), field("arr", list(utf8())),
+ field("nuf", struct_({field("ps", utf8())}))},
+ {"[\"thing\", null, \"\xe5\xbf\x8d\", null]",
+ R"([["1", "2", "3"], ["2"], [], null])",
+ R"([{"ps":null}, null, {"ps":"78"}, {"ps":"90"}])"});
+}
+
+void AssertParseOne(ParseOptions options, string_view src_str,
+ const std::vector<std::shared_ptr<Field>>& fields,
+ const std::vector<std::string>& columns_json) {
+ std::shared_ptr<Buffer> src;
+ ASSERT_OK(MakeBuffer(src_str, &src));
+ std::shared_ptr<RecordBatch> parsed;
+ ASSERT_OK(ParseOne(options, src, &parsed));
+ for (size_t i = 0; i != fields.size(); ++i) {
+ auto column_expected = ArrayFromJSON(fields[i]->type(), columns_json[i]);
+ auto column = parsed->GetColumnByName(fields[i]->name());
+ AssertArraysEqual(*column_expected, *column);
+ }
+}
+
+TEST(ParseOne, Basics) {
+ auto options = ParseOptions::Defaults();
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+ AssertParseOne(
+ options, scalars_only_src(),
+ {field("hello", float64()), field("world", boolean()), field("yo", utf8())},
+ {"[3.5, 3.2, 3.4, 0.0]", "[false, null, null, true]",
+ "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
+}
+
+TEST(ParseOne, Nested) {
+ auto options = ParseOptions::Defaults();
+ options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+ AssertParseOne(
+ options, nested_src(),
+ {field("yo", utf8()), field("arr", list(int64())),
+ field("nuf", struct_({field("ps", int64())}))},
+ {"[\"thing\", null, \"\xe5\xbf\x8d\", null]", R"([[1, 2, 3], [2], [], null])",
+ R"([{"ps":null}, null, {"ps":78}, {"ps":90}])"});
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/cpp/src/arrow/json/parser.cc b/cpp/src/arrow/json/parser.cc
new file mode 100644
index 0000000..6182495
--- /dev/null
+++ b/cpp/src/arrow/json/parser.cc
@@ -0,0 +1,1034 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/parser.h"
+
+#include <algorithm>
+#include <cstdio>
+#include <limits>
+#include <sstream>
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include <rapidjson/error/en.h>
+#include <rapidjson/reader.h>
+
+#include "arrow/array.h"
+#include "arrow/buffer-builder.h"
+#include "arrow/builder.h"
+#include "arrow/csv/converter.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/decimal.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/stl.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+namespace json {
+
+using internal::checked_cast;
+using util::string_view;
+
+template <typename... T>
+Status ParseError(T&&... t) {
+ return Status::Invalid("JSON parse error: ", std::forward<T>(t)...);
+}
+
+Status KindChangeError(Kind::type from, Kind::type to) {
+ auto from_name = Tag(from)->value(0);
+ auto to_name = Tag(to)->value(0);
+ return ParseError("A column changed from ", from_name, " to ", to_name);
+}
+
+/// Similar to StringBuilder, but appends bytes into the provided buffer without
+/// resizing. This builder does not support appending nulls.
+class UnsafeStringBuilder {
+ public:
+ UnsafeStringBuilder(MemoryPool* pool, const std::shared_ptr<Buffer>& buffer)
+ : offsets_builder_(pool), values_buffer_(buffer) {
+ DCHECK_NE(values_buffer_, nullptr);
+ }
+
+ Status Append(string_view str) {
+ DCHECK_LE(static_cast<int64_t>(str.size()), capacity() - values_end_);
+ RETURN_NOT_OK(AppendNextOffset());
+ std::memcpy(values_buffer_->mutable_data() + values_end_, str.data(), str.size());
+ length_ += 1;
+ values_end_ += str.size();
+ return Status::OK();
+ }
+
+ // Builder may not be reused after Finish()
+ Status Finish(std::shared_ptr<Array>* out, int64_t* values_length = nullptr) && {
+ RETURN_NOT_OK(AppendNextOffset());
+ if (values_length) {
+ *values_length = values_end_;
+ }
+ std::shared_ptr<Buffer> offsets;
+ RETURN_NOT_OK(offsets_builder_.Finish(&offsets));
+ auto data = ArrayData::Make(utf8(), length_, {nullptr, offsets, values_buffer_}, 0);
+ *out = MakeArray(data);
+ return Status::OK();
+ }
+
+ int64_t length() { return length_; }
+
+ int64_t capacity() { return values_buffer_->size(); }
+
+ int64_t remaining_capacity() { return values_buffer_->size() - values_end_; }
+
+ private:
+ Status AppendNextOffset() {
+ return offsets_builder_.Append(static_cast<int32_t>(values_end_));
+ }
+
+ int64_t length_ = 0;
+ int64_t values_end_ = 0;
+ TypedBufferBuilder<int32_t> offsets_builder_;
+ std::shared_ptr<Buffer> values_buffer_;
+};
+
+/// Store a stack of bitsets efficiently. The top bitset may be accessed and its bits may
+/// be modified, but it may not be resized.
+class BitsetStack {
+ public:
+ using reference = typename std::vector<bool>::reference;
+
+ void Push(int size, bool value) {
+ offsets_.push_back(bit_count());
+ bits_.resize(bit_count() + size, value);
+ }
+
+ int TopSize() const { return bit_count() - offsets_.back(); }
+
+ void Pop() {
+ bits_.resize(offsets_.back());
+ offsets_.pop_back();
+ }
+
+ reference operator[](int i) { return bits_[offsets_.back() + i]; }
+
+ bool operator[](int i) const { return bits_[offsets_.back() + i]; }
+
+ private:
+ int bit_count() const { return static_cast<int>(bits_.size()); }
+ std::vector<bool> bits_;
+ std::vector<int> offsets_;
+};
+
+/// \brief ArrayBuilder for parsed but unconverted arrays
+template <Kind::type>
+class RawArrayBuilder;
+
+/// \brief packed pointer to a RawArrayBuilder
+///
+/// RawArrayBuilders are stored in HandlerBase,
+/// which allows storage of their indices (uint32_t) instead of a full pointer.
+/// BuilderPtr is also tagged with the json kind and nullable properties
+/// so those can be accessed before dereferencing the builder.
+struct BuilderPtr {
+ BuilderPtr() : BuilderPtr(BuilderPtr::null) {}
+ BuilderPtr(Kind::type k, uint32_t i, bool n) : index(i), kind(k), nullable(n) {}
+
+ BuilderPtr(const BuilderPtr&) = default;
+ BuilderPtr& operator=(const BuilderPtr&) = default;
+ BuilderPtr(BuilderPtr&&) = default;
+ BuilderPtr& operator=(BuilderPtr&&) = default;
+
+ // index of builder in its arena
+ // OR the length of that builder if kind == Kind::kNull
+ // (we don't allocate an arena for nulls since they're trivial)
+ // FIXME(bkietz) GCC is emitting conversion errors for the bitfields
+ uint32_t index; // : 28;
+ Kind::type kind;
+ bool nullable;
+
+ bool operator==(BuilderPtr other) const {
+ return kind == other.kind && index == other.index;
+ }
+
+ bool operator!=(BuilderPtr other) const { return !(other == *this); }
+
+ operator bool() const { return *this != null; }
+
+ bool operator!() const { return *this == null; }
+
+ static const BuilderPtr null;
+};
+
+const BuilderPtr BuilderPtr::null(Kind::kNull, 0, false);
+
+template <>
+class RawArrayBuilder<Kind::kBoolean> {
+ public:
+ explicit RawArrayBuilder(MemoryPool* pool)
+ : data_builder_(pool), null_bitmap_builder_(pool) {}
+
+ Status Append(bool value) {
+ RETURN_NOT_OK(data_builder_.Append(value));
+ return null_bitmap_builder_.Append(true);
+ }
+
+ Status AppendNull() {
+ RETURN_NOT_OK(data_builder_.Append(false));
+ return null_bitmap_builder_.Append(false);
+ }
+
+ Status AppendNull(int64_t count) {
+ RETURN_NOT_OK(data_builder_.Append(count, false));
+ return null_bitmap_builder_.Append(count, false);
+ }
+
+ Status Finish(std::shared_ptr<Array>* out) {
+ auto size = length();
+ auto null_count = null_bitmap_builder_.false_count();
+ std::shared_ptr<Buffer> data, null_bitmap;
+ RETURN_NOT_OK(data_builder_.Finish(&data));
+ RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+ *out = MakeArray(ArrayData::Make(boolean(), size, {null_bitmap, data}, null_count));
+ return Status::OK();
+ }
+
+ int64_t length() { return null_bitmap_builder_.length(); }
+
+ private:
+ TypedBufferBuilder<bool> data_builder_;
+ TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+/// \brief builder for strings or unconverted numbers
+///
+/// Both of these are represented in the builder as an index only;
+/// the actual characters are stored in a single StringArray (into which
+/// an index refers). This means building is faster since we don't do
+/// allocation for string/number characters but accessing is strided.
+///
+/// On completion the indices and the character storage are combined into
+/// a DictionaryArray, which is a convenient container for indices referring
+/// into another array.
+class ScalarBuilder {
+ public:
+ explicit ScalarBuilder(MemoryPool* pool)
+ : data_builder_(pool), null_bitmap_builder_(pool) {}
+
+ Status Append(int32_t index) {
+ RETURN_NOT_OK(data_builder_.Append(index));
+ return null_bitmap_builder_.Append(true);
+ }
+
+ Status AppendNull() {
+ RETURN_NOT_OK(data_builder_.Append(0));
+ return null_bitmap_builder_.Append(false);
+ }
+
+ Status AppendNull(int64_t count) {
+ RETURN_NOT_OK(data_builder_.Append(count, 0));
+ return null_bitmap_builder_.Append(count, false);
+ }
+
+ Status Finish(std::shared_ptr<Array>* out) {
+ auto size = length();
+ auto null_count = null_bitmap_builder_.false_count();
+ std::shared_ptr<Buffer> data, null_bitmap;
+ RETURN_NOT_OK(data_builder_.Finish(&data));
+ RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+ *out = MakeArray(ArrayData::Make(int32(), size, {null_bitmap, data}, null_count));
+ return Status::OK();
+ }
+
+ int64_t length() { return null_bitmap_builder_.length(); }
+
+ // TODO(bkietz) track total length of bytes for later simpler allocation
+
+ private:
+ TypedBufferBuilder<int32_t> data_builder_;
+ TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+template <>
+class RawArrayBuilder<Kind::kNumber> : public ScalarBuilder {
+ public:
+ using ScalarBuilder::ScalarBuilder;
+};
+
+template <>
+class RawArrayBuilder<Kind::kString> : public ScalarBuilder {
+ public:
+ using ScalarBuilder::ScalarBuilder;
+};
+
+template <>
+class RawArrayBuilder<Kind::kArray> {
+ public:
+ explicit RawArrayBuilder(MemoryPool* pool)
+ : offset_builder_(pool), null_bitmap_builder_(pool) {}
+
+ Status Append(int32_t child_length) {
+ RETURN_NOT_OK(offset_builder_.Append(offset_));
+ offset_ += child_length;
+ return null_bitmap_builder_.Append(true);
+ }
+
+ Status AppendNull() {
+ RETURN_NOT_OK(offset_builder_.Append(offset_));
+ return null_bitmap_builder_.Append(false);
+ }
+
+ Status AppendNull(int64_t count) {
+ RETURN_NOT_OK(offset_builder_.Append(count, offset_));
+ return null_bitmap_builder_.Append(count, false);
+ }
+
+ template <typename HandlerBase>
+ Status Finish(HandlerBase& handler, std::shared_ptr<Array>* out) {
+ RETURN_NOT_OK(offset_builder_.Append(offset_));
+ auto size = length();
+ auto null_count = null_bitmap_builder_.false_count();
+ std::shared_ptr<Buffer> offsets, null_bitmap;
+ RETURN_NOT_OK(offset_builder_.Finish(&offsets));
+ RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+ std::shared_ptr<Array> values;
+ RETURN_NOT_OK(handler.Finish(value_builder_, &values));
+ auto type = list(
+ field("item", values->type(), value_builder_.nullable, Tag(value_builder_.kind)));
+ *out = MakeArray(ArrayData::Make(type, size, {null_bitmap, offsets}, {values->data()},
+ null_count));
+ return Status::OK();
+ }
+
+ BuilderPtr value_builder() const { return value_builder_; }
+
+ void value_builder(BuilderPtr builder) { value_builder_ = builder; }
+
+ int64_t length() { return null_bitmap_builder_.length(); }
+
+ private:
+ BuilderPtr value_builder_ = BuilderPtr::null;
+ int32_t offset_ = 0;
+ TypedBufferBuilder<int32_t> offset_builder_;
+ TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+template <>
+class RawArrayBuilder<Kind::kObject> {
+ public:
+ explicit RawArrayBuilder(MemoryPool* pool) : null_bitmap_builder_(pool) {}
+
+ Status Append() { return null_bitmap_builder_.Append(true); }
+
+ Status AppendNull() { return null_bitmap_builder_.Append(false); }
+
+ Status AppendNull(int64_t count) { return null_bitmap_builder_.Append(count, false); }
+
+ int GetFieldIndex(const std::string& name) const {
+ auto it = name_to_index_.find(name);
+ if (it == name_to_index_.end()) {
+ return -1;
+ }
+ return it->second;
+ }
+
+ int AddField(std::string name, BuilderPtr builder) {
+ auto index = num_fields();
+ field_builders_.push_back(builder);
+ name_to_index_.emplace(std::move(name), index);
+ return index;
+ }
+
+ int num_fields() const { return static_cast<int>(field_builders_.size()); }
+
+ BuilderPtr field_builder(int index) const { return field_builders_[index]; }
+
+ void field_builder(int index, BuilderPtr builder) { field_builders_[index] = builder; }
+
+ template <typename HandlerBase>
+ Status Finish(HandlerBase& handler, std::shared_ptr<Array>* out) {
+ auto size = length();
+ auto null_count = null_bitmap_builder_.false_count();
+ std::shared_ptr<Buffer> null_bitmap;
+ RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
+
+ std::vector<string_view> field_names(num_fields());
+ for (const auto& name_index : name_to_index_) {
+ field_names[name_index.second] = name_index.first;
+ }
+
+ std::vector<std::shared_ptr<Field>> fields(num_fields());
+ std::vector<std::shared_ptr<ArrayData>> child_data(num_fields());
+ for (int i = 0; i != num_fields(); ++i) {
+ std::shared_ptr<Array> values;
+ RETURN_NOT_OK(handler.Finish(field_builders_[i], &values));
+ child_data[i] = values->data();
+ fields[i] = field(field_names[i].to_string(), values->type(),
+ field_builders_[i].nullable, Tag(field_builders_[i].kind));
+ }
+
+ *out = MakeArray(ArrayData::Make(struct_(std::move(fields)), size, {null_bitmap},
+ std::move(child_data), null_count));
+ return Status::OK();
+ }
+
+ int64_t length() { return null_bitmap_builder_.length(); }
+
+ private:
+ std::vector<BuilderPtr> field_builders_;
+ std::unordered_map<std::string, int> name_to_index_;
+ TypedBufferBuilder<bool> null_bitmap_builder_;
+};
+
+/// Three implementations are provided for BlockParser::Impl, one for each
+/// UnexpectedFieldBehavior. However most of the logic is identical in each
+/// case, so the majority of the implementation is in this base class
+class HandlerBase : public BlockParser::Impl,
+ public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, HandlerBase> {
+ public:
+ /// Retrieve a pointer to a builder from a BuilderPtr
+ template <Kind::type kind>
+ typename std::enable_if<kind != Kind::kNull, RawArrayBuilder<kind>*>::type Cast(
+ BuilderPtr builder) {
+ DCHECK_EQ(builder.kind, kind);
+ return arena<kind>().data() + builder.index;
+ }
+
+ /// Accessor for a stored error Status
+ Status Error() { return status_; }
+
+ /// \defgroup rapidjson-handler-interface functions expected by rapidjson::Reader
+ ///
+ /// bool Key(const char* data, rapidjson::SizeType size, ...) is omitted since
+ /// the behavior varies greatly between UnexpectedFieldBehaviors
+ ///
+ /// @{
+ bool Null() {
+ status_ = AppendNull();
+ return status_.ok();
+ }
+
+ bool Bool(bool value) {
+ status_ = AppendBool(value);
+ return status_.ok();
+ }
+
+ bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+ status_ = AppendScalar<Kind::kNumber>(string_view(data, size));
+ return status_.ok();
+ }
+
+ bool String(const char* data, rapidjson::SizeType size, ...) {
+ status_ = AppendScalar<Kind::kString>(string_view(data, size));
+ return status_.ok();
+ }
+
+ bool StartObject() {
+ status_ = StartObjectImpl();
+ return status_.ok();
+ }
+
+ bool EndObject(...) {
+ status_ = EndObjectImpl();
+ return status_.ok();
+ }
+
+ bool StartArray() {
+ status_ = StartArrayImpl();
+ return status_.ok();
+ }
+
+ bool EndArray(rapidjson::SizeType size) {
+ status_ = EndArrayImpl(size);
+ return status_.ok();
+ }
+ /// @}
+
+ /// \brief Set up builders using an expected Schema
+ Status SetSchema(const Schema& s) {
+ DCHECK_EQ(arena<Kind::kObject>().size(), 1);
+ for (const auto& f : s.fields()) {
+ BuilderPtr field_builder;
+ RETURN_NOT_OK(MakeBuilder(*f->type(), 0, &field_builder));
+ field_builder.nullable = f->nullable();
+ Cast<Kind::kObject>(builder_)->AddField(f->name(), field_builder);
+ }
+ return Status::OK();
+ }
+
+ Status Finish(BuilderPtr builder, std::shared_ptr<Array>* out) {
+ switch (builder.kind) {
+ case Kind::kNull: {
+ auto length = static_cast<int64_t>(builder.index);
+ *out = std::make_shared<NullArray>(length);
+ return Status::OK();
+ }
+ case Kind::kBoolean:
+ return Cast<Kind::kBoolean>(builder)->Finish(out);
+ case Kind::kNumber:
+ return FinishScalar(Cast<Kind::kNumber>(builder), out);
+ case Kind::kString:
+ return FinishScalar(Cast<Kind::kString>(builder), out);
+ case Kind::kArray:
+ return Cast<Kind::kArray>(builder)->Finish(*this, out);
+ case Kind::kObject:
+ return Cast<Kind::kObject>(builder)->Finish(*this, out);
+ default:
+ return Status::NotImplemented("invalid builder kind");
+ }
+ }
+
+ Status Finish(std::shared_ptr<Array>* parsed) override {
+ RETURN_NOT_OK(std::move(scalar_values_builder_).Finish(&scalar_values_));
+ return Finish(builder_, parsed);
+ }
+
+ int32_t num_rows() override { return num_rows_; }
+
+ protected:
+ HandlerBase(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
+ : pool_(pool),
+ builder_(Kind::kObject, 0, false),
+ scalar_values_builder_(pool, scalar_storage) {
+ arena<Kind::kObject>().emplace_back(pool_);
+ }
+
+ /// finish a column of scalar values (string or number)
+ Status FinishScalar(ScalarBuilder* builder, std::shared_ptr<Array>* out) {
+ std::shared_ptr<Array> indices;
+ RETURN_NOT_OK(builder->Finish(&indices));
+ return DictionaryArray::FromArrays(dictionary(int32(), scalar_values_), indices, out);
+ }
+
+ template <typename Handler>
+ Status DoParse(Handler& handler, const std::shared_ptr<Buffer>& json) {
+ constexpr auto parse_flags =
+ rapidjson::kParseInsituFlag | rapidjson::kParseIterativeFlag |
+ rapidjson::kParseStopWhenDoneFlag | rapidjson::kParseNumbersAsStringsFlag;
+ auto json_data = reinterpret_cast<char*>(json->mutable_data());
+ rapidjson::GenericInsituStringStream<rapidjson::UTF8<>> ss(json_data);
+ rapidjson::Reader reader;
+
+ for (; num_rows_ != kMaxParserNumRows; ++num_rows_) {
+ // parse a single line of JSON
+ auto ok = reader.Parse<parse_flags>(ss, handler);
+ switch (ok.Code()) {
+ case rapidjson::kParseErrorNone:
+ // parse the next object
+ continue;
+ case rapidjson::kParseErrorDocumentEmpty: {
+ // parsed all objects, finish
+ return Status::OK();
+ }
+ case rapidjson::kParseErrorTermination:
+ // handler emitted an error
+ return handler.Error();
+ default:
+ // rapidjson emitted an error
+ return ParseError(rapidjson::GetParseError_En(ok.Code()));
+ }
+ }
+ return Status::Invalid("Exceeded maximum rows");
+ }
+
+ /// construct a builder of staticallly defined kind in arenas_
+ template <Kind::type kind>
+ Status MakeBuilder(int64_t leading_nulls, BuilderPtr* builder) {
+ builder->index = static_cast<uint32_t>(arena<kind>().size());
+ builder->kind = kind;
+ builder->nullable = true;
+ arena<kind>().emplace_back(pool_);
+ return Cast<kind>(*builder)->AppendNull(leading_nulls);
+ }
+
+ /// construct a builder of whatever kind corresponds to a DataType
+ Status MakeBuilder(const DataType& t, int64_t leading_nulls, BuilderPtr* builder) {
+ Kind::type kind;
+ RETURN_NOT_OK(KindForType(t, &kind));
+ switch (kind) {
+ case Kind::kNull:
+ *builder = BuilderPtr(Kind::kNull, static_cast<uint32_t>(leading_nulls), true);
+ return Status::OK();
+ case Kind::kBoolean:
+ return MakeBuilder<Kind::kBoolean>(leading_nulls, builder);
+ case Kind::kNumber:
+ return MakeBuilder<Kind::kNumber>(leading_nulls, builder);
+ case Kind::kString:
+ return MakeBuilder<Kind::kString>(leading_nulls, builder);
+ case Kind::kArray: {
+ RETURN_NOT_OK(MakeBuilder<Kind::kArray>(leading_nulls, builder));
+ const auto& list_type = static_cast<const ListType&>(t);
+ BuilderPtr value_builder;
+ RETURN_NOT_OK(MakeBuilder(*list_type.value_type(), 0, &value_builder));
+ value_builder.nullable = list_type.value_field()->nullable();
+ Cast<Kind::kArray>(*builder)->value_builder(value_builder);
+ return Status::OK();
+ }
+ case Kind::kObject: {
+ RETURN_NOT_OK(MakeBuilder<Kind::kObject>(leading_nulls, builder));
+ const auto& struct_type = static_cast<const StructType&>(t);
+ for (const auto& f : struct_type.children()) {
+ BuilderPtr field_builder;
+ RETURN_NOT_OK(MakeBuilder(*f->type(), leading_nulls, &field_builder));
+ field_builder.nullable = f->nullable();
+ Cast<Kind::kObject>(*builder)->AddField(f->name(), field_builder);
+ }
+ return Status::OK();
+ }
+ default:
+ return Status::NotImplemented("invalid builder type");
+ }
+ }
+
+ /// \defgroup handlerbase-append-methods append non-nested values
+ ///
+ /// These methods act on builder_
+ /// @{
+
+ Status AppendNull() {
+ if (ARROW_PREDICT_FALSE(!builder_.nullable)) {
+ return ParseError("a required field was null");
+ }
+ switch (builder_.kind) {
+ case Kind::kNull: {
+ // increment null count stored inline
+ // update the parent, since changing builder_ doesn't affect parent
+ auto parent = builder_stack_.back();
+ if (parent.kind == Kind::kArray) {
+ auto list_builder = Cast<Kind::kArray>(parent);
+ DCHECK_EQ(list_builder->value_builder(), builder_);
+ builder_.index += 1;
+ list_builder->value_builder(builder_);
+ } else {
+ auto struct_builder = Cast<Kind::kObject>(parent);
+ DCHECK_EQ(struct_builder->field_builder(field_index_), builder_);
+ builder_.index += 1;
+ struct_builder->field_builder(field_index_, builder_);
+ }
+ return Status::OK();
+ }
+ case Kind::kBoolean:
+ return Cast<Kind::kBoolean>(builder_)->AppendNull();
+ case Kind::kNumber:
+ return Cast<Kind::kNumber>(builder_)->AppendNull();
+ case Kind::kString:
+ return Cast<Kind::kString>(builder_)->AppendNull();
+ case Kind::kArray:
+ return Cast<Kind::kArray>(builder_)->AppendNull();
+ case Kind::kObject: {
+ auto root = builder_;
+ auto struct_builder = Cast<Kind::kObject>(builder_);
+ RETURN_NOT_OK(struct_builder->AppendNull());
+ for (int i = 0; i != struct_builder->num_fields(); ++i) {
+ builder_ = struct_builder->field_builder(i);
+ RETURN_NOT_OK(AppendNull());
+ }
+ builder_ = root;
+ return Status::OK();
+ }
+ default:
+ return Status::NotImplemented("invalid builder Kind");
+ }
+ }
+
+ Status AppendBool(bool value) {
+ constexpr auto kind = Kind::kBoolean;
+ if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+ return IllegallyChangedTo(kind);
+ }
+ return Cast<kind>(builder_)->Append(value);
+ }
+
+ template <Kind::type kind>
+ Status AppendScalar(string_view scalar) {
+ if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+ return IllegallyChangedTo(kind);
+ }
+ auto index = static_cast<int32_t>(scalar_values_builder_.length());
+ RETURN_NOT_OK(Cast<kind>(builder_)->Append(index));
+ return scalar_values_builder_.Append(scalar);
+ }
+
+ /// @}
+
+ Status StartObjectImpl() {
+ constexpr auto kind = Kind::kObject;
+ if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+ return IllegallyChangedTo(kind);
+ }
+ auto struct_builder = Cast<kind>(builder_);
+ absent_fields_stack_.Push(struct_builder->num_fields(), true);
+ PushStacks();
+ return struct_builder->Append();
+ }
+
+ /// \brief helper for Key() functions
+ ///
+ /// sets the field builder with name key, or returns false if
+ /// there is no field with that name
+ bool SetFieldBuilder(string_view key) {
+ auto parent = Cast<Kind::kObject>(builder_stack_.back());
+ field_index_ = parent->GetFieldIndex(std::string(key));
+ if (ARROW_PREDICT_FALSE(field_index_ == -1)) {
+ return false;
+ }
+ builder_ = parent->field_builder(field_index_);
+ absent_fields_stack_[field_index_] = false;
+ return true;
+ }
+
+ Status EndObjectImpl() {
+ auto parent = Cast<Kind::kObject>(builder_stack_.back());
+
+ auto expected_count = absent_fields_stack_.TopSize();
+ for (field_index_ = 0; field_index_ != expected_count; ++field_index_) {
+ if (!absent_fields_stack_[field_index_]) {
+ continue;
+ }
+ builder_ = parent->field_builder(field_index_);
+ if (ARROW_PREDICT_FALSE(!builder_.nullable)) {
+ return ParseError("a required field was absent");
+ }
+ RETURN_NOT_OK(AppendNull());
+ }
+ absent_fields_stack_.Pop();
+ PopStacks();
+ return Status::OK();
+ }
+
+ Status StartArrayImpl() {
+ constexpr auto kind = Kind::kArray;
+ if (ARROW_PREDICT_FALSE(builder_.kind != kind)) {
+ return IllegallyChangedTo(kind);
+ }
+ PushStacks();
+ // append to the list builder in EndArrayImpl
+ builder_ = Cast<kind>(builder_)->value_builder();
+ return Status::OK();
+ }
+
+ Status EndArrayImpl(rapidjson::SizeType size) {
+ PopStacks();
+ // append to list_builder here
+ auto list_builder = Cast<Kind::kArray>(builder_);
+ return list_builder->Append(size);
+ }
+
+ /// helper method for StartArray and StartObject
+ /// adds the current builder to a stack so its
+ /// children can be visited and parsed.
+ void PushStacks() {
+ field_index_stack_.push_back(field_index_);
+ field_index_ = -1;
+ builder_stack_.push_back(builder_);
+ }
+
+ /// helper method for EndArray and EndObject
+ /// replaces the current builder with its parent
+ /// so parsing of the parent can continue
+ void PopStacks() {
+ field_index_ = field_index_stack_.back();
+ field_index_stack_.pop_back();
+ builder_ = builder_stack_.back();
+ builder_stack_.pop_back();
+ }
+
+ Status IllegallyChangedTo(Kind::type illegally_changed_to) {
+ return KindChangeError(builder_.kind, illegally_changed_to);
+ }
+
+ template <Kind::type kind>
+ std::vector<RawArrayBuilder<kind>>& arena() {
+ return std::get<static_cast<std::size_t>(kind)>(arenas_);
+ }
+
+ Status status_;
+ MemoryPool* pool_;
+ std::tuple<std::tuple<>, std::vector<RawArrayBuilder<Kind::kBoolean>>,
+ std::vector<RawArrayBuilder<Kind::kNumber>>,
+ std::vector<RawArrayBuilder<Kind::kString>>,
+ std::vector<RawArrayBuilder<Kind::kArray>>,
+ std::vector<RawArrayBuilder<Kind::kObject>>>
+ arenas_;
+ BuilderPtr builder_;
+ // top of this stack is the parent of builder_
+ std::vector<BuilderPtr> builder_stack_;
+ // top of this stack refers to the fields of the highest *StructBuilder*
+ // in builder_stack_ (list builders don't have absent fields)
+ BitsetStack absent_fields_stack_;
+ // index of builder_ within its parent
+ int field_index_;
+ // top of this stack == field_index_
+ std::vector<int> field_index_stack_;
+ UnsafeStringBuilder scalar_values_builder_;
+ std::shared_ptr<Array> scalar_values_;
+ int32_t num_rows_ = 0;
+};
+
+template <UnexpectedFieldBehavior>
+class Handler;
+
+template <>
+class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase {
+ public:
+ Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
+ : HandlerBase(pool, scalar_storage) {}
+
+ Status Parse(const std::shared_ptr<Buffer>& json) override {
+ return DoParse(*this, json);
+ }
+
+ /// \ingroup rapidjson-handler-interface
+ ///
+ /// if an unexpected field is encountered, emit a parse error and bail
+ bool Key(const char* key, rapidjson::SizeType len, ...) {
+ if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
+ return true;
+ }
+ status_ = ParseError("unexpected field");
+ return false;
+ }
+};
+
+template <>
+class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
+ public:
+ Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
+ : HandlerBase(pool, scalar_storage) {}
+
+ Status Parse(const std::shared_ptr<Buffer>& json) override {
+ return DoParse(*this, json);
+ }
+
+ bool Null() {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::Null();
+ }
+
+ bool Bool(bool value) {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::Bool(value);
+ }
+
+ bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::RawNumber(data, size);
+ }
+
+ bool String(const char* data, rapidjson::SizeType size, ...) {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::String(data, size);
+ }
+
+ bool StartObject() {
+ ++depth_;
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::StartObject();
+ }
+
+ /// \ingroup rapidjson-handler-interface
+ ///
+ /// if an unexpected field is encountered, skip until its value has been consumed
+ bool Key(const char* key, rapidjson::SizeType len, ...) {
+ MaybeStopSkipping();
+ if (Skipping()) {
+ return true;
+ }
+ if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
+ return true;
+ }
+ skip_depth_ = depth_;
+ return true;
+ }
+
+ bool EndObject(...) {
+ MaybeStopSkipping();
+ --depth_;
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::EndObject();
+ }
+
+ bool StartArray() {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::StartArray();
+ }
+
+ bool EndArray(rapidjson::SizeType size) {
+ if (Skipping()) {
+ return true;
+ }
+ return HandlerBase::EndArray(size);
+ }
+
+ private:
+ bool Skipping() { return depth_ >= skip_depth_; }
+
+ void MaybeStopSkipping() {
+ if (skip_depth_ == depth_) {
+ skip_depth_ = std::numeric_limits<int>::max();
+ }
+ }
+
+ int depth_ = 0;
+ int skip_depth_ = std::numeric_limits<int>::max();
+};
+
+template <>
+class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
+ public:
+ Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
+ : HandlerBase(pool, scalar_storage) {}
+
+ Status Parse(const std::shared_ptr<Buffer>& json) override {
+ return DoParse(*this, json);
+ }
+
+ bool Bool(bool value) {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kBoolean>())) {
+ return false;
+ }
+ return HandlerBase::Bool(value);
+ }
+
+ bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kNumber>())) {
+ return false;
+ }
+ return HandlerBase::RawNumber(data, size);
+ }
+
+ bool String(const char* data, rapidjson::SizeType size, ...) {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kString>())) {
+ return false;
+ }
+ return HandlerBase::String(data, size);
+ }
+
+ bool StartObject() {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kObject>())) {
+ return false;
+ }
+ return HandlerBase::StartObject();
+ }
+
+ /// \ingroup rapidjson-handler-interface
+ ///
+ /// If an unexpected field is encountered, add a new builder to
+ /// the current parent builder. It is added as a NullBuilder with
+ /// (parent.length - 1) leading nulls. The next value parsed
+ /// will probably trigger promotion of this field from null
+ bool Key(const char* key, rapidjson::SizeType len, ...) {
+ if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
+ return true;
+ }
+ auto struct_builder = Cast<Kind::kObject>(builder_stack_.back());
+ auto leading_nulls = static_cast<uint32_t>(struct_builder->length() - 1);
+ builder_ = BuilderPtr(Kind::kNull, leading_nulls, true);
+ field_index_ = struct_builder->AddField(std::string(key, len), builder_);
+ return true;
+ }
+
+ bool StartArray() {
+ if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kArray>())) {
+ return false;
+ }
+ return HandlerBase::StartArray();
+ }
+
+ private:
+ // return true if a terminal error was encountered
+ template <Kind::type kind>
+ bool MaybePromoteFromNull() {
+ if (ARROW_PREDICT_TRUE(builder_.kind != Kind::kNull)) {
+ return false;
+ }
+ auto parent = builder_stack_.back();
+ if (parent.kind == Kind::kArray) {
+ auto list_builder = Cast<Kind::kArray>(parent);
+ DCHECK_EQ(list_builder->value_builder(), builder_);
+ status_ = MakeBuilder<kind>(builder_.index, &builder_);
+ if (ARROW_PREDICT_FALSE(!status_.ok())) {
+ return true;
+ }
+ list_builder = Cast<Kind::kArray>(parent);
+ list_builder->value_builder(builder_);
+ } else {
+ auto struct_builder = Cast<Kind::kObject>(parent);
+ DCHECK_EQ(struct_builder->field_builder(field_index_), builder_);
+ status_ = MakeBuilder<kind>(builder_.index, &builder_);
+ if (ARROW_PREDICT_FALSE(!status_.ok())) {
+ return true;
+ }
+ struct_builder = Cast<Kind::kObject>(parent);
+ struct_builder->field_builder(field_index_, builder_);
+ }
+ return false;
+ }
+};
+
+BlockParser::BlockParser(MemoryPool* pool, ParseOptions options,
+ const std::shared_ptr<Buffer>& scalar_storage)
+ : pool_(pool), options_(options) {
+ DCHECK(options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType ||
+ options_.explicit_schema != nullptr);
+ switch (options_.unexpected_field_behavior) {
+ case UnexpectedFieldBehavior::Ignore: {
+ auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::Ignore>>(
+ pool_, scalar_storage);
+ // FIXME(bkietz) move this to an Initialize()
+ ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
+ impl_ = std::move(handler);
+ break;
+ }
+ case UnexpectedFieldBehavior::Error: {
+ auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::Error>>(
+ pool_, scalar_storage);
+ ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
+ impl_ = std::move(handler);
+ break;
+ }
+ case UnexpectedFieldBehavior::InferType:
+ auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::InferType>>(
+ pool_, scalar_storage);
+ if (options.explicit_schema) {
+ ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
+ }
+ impl_ = std::move(handler);
+ break;
+ }
+}
+
+BlockParser::BlockParser(ParseOptions options,
+ const std::shared_ptr<Buffer>& scalar_storage)
+ : BlockParser(default_memory_pool(), options, scalar_storage) {}
+
+} // namespace json
+} // namespace arrow
diff --git a/cpp/src/arrow/json/parser.h b/cpp/src/arrow/json/parser.h
new file mode 100644
index 0000000..c001598
--- /dev/null
+++ b/cpp/src/arrow/json/parser.h
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_PARSER_H
+#define ARROW_JSON_PARSER_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "arrow/builder.h"
+#include "arrow/json/options.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/visibility.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+
+class MemoryPool;
+class RecordBatch;
+
+namespace json {
+
+struct Kind {
+ enum type : uint8_t { kNull, kBoolean, kNumber, kString, kArray, kObject };
+};
+
+inline static const std::shared_ptr<const KeyValueMetadata>& Tag(Kind::type k) {
+ static std::shared_ptr<const KeyValueMetadata> tags[] = {
+ key_value_metadata({{"json_kind", "null"}}),
+ key_value_metadata({{"json_kind", "boolean"}}),
+ key_value_metadata({{"json_kind", "number"}}),
+ key_value_metadata({{"json_kind", "string"}}),
+ key_value_metadata({{"json_kind", "array"}}),
+ key_value_metadata({{"json_kind", "object"}})};
+ return tags[static_cast<uint8_t>(k)];
+}
+
+inline static Status KindForType(const DataType& type, Kind::type* kind) {
+ struct {
+ Status Visit(const NullType&) { return SetKind(Kind::kNull); }
+ Status Visit(const BooleanType&) { return SetKind(Kind::kBoolean); }
+ Status Visit(const Number&) { return SetKind(Kind::kNumber); }
+ // XXX should TimeType & DateType be Kind::kNumber or Kind::kString?
+ Status Visit(const TimeType&) { return SetKind(Kind::kNumber); }
+ Status Visit(const DateType&) { return SetKind(Kind::kNumber); }
+ Status Visit(const BinaryType&) { return SetKind(Kind::kString); }
+ // XXX should Decimal128Type be Kind::kNumber or Kind::kString?
+ Status Visit(const FixedSizeBinaryType&) { return SetKind(Kind::kString); }
+ Status Visit(const DictionaryType& dict_type) {
+ return KindForType(*dict_type.dictionary()->type(), kind_);
+ }
+ Status Visit(const ListType&) { return SetKind(Kind::kArray); }
+ Status Visit(const StructType&) { return SetKind(Kind::kObject); }
+ Status Visit(const DataType& not_impl) {
+ return Status::NotImplemented("JSON parsing of ", not_impl);
+ }
+ Status SetKind(Kind::type kind) {
+ *kind_ = kind;
+ return Status::OK();
+ }
+ Kind::type* kind_;
+ } visitor = {kind};
+ return VisitTypeInline(type, &visitor);
+}
+
+constexpr int32_t kMaxParserNumRows = 100000;
+
+/// \class BlockParser
+/// \brief A reusable block-based parser for JSON data
+///
+/// The parser takes a block of newline delimited JSON data and extracts
+/// keys and value pairs, inserting into provided ArrayBuilders.
+/// Parsed data is own by the
+/// parser, so the original buffer can be discarded after Parse() returns.
+class ARROW_EXPORT BlockParser {
+ public:
+ BlockParser(MemoryPool* pool, ParseOptions options,
+ const std::shared_ptr<Buffer>& scalar_storage);
+ BlockParser(ParseOptions options, const std::shared_ptr<Buffer>& scalar_storage);
+
+ /// \brief Parse a block of data insitu (destructively)
+ /// \warning The input must be null terminated
+ Status Parse(const std::shared_ptr<Buffer>& json) { return impl_->Parse(json); }
+
+ /// \brief Extract parsed data
+ Status Finish(std::shared_ptr<Array>* parsed) { return impl_->Finish(parsed); }
+
+ /// \brief Return the number of parsed rows
+ int32_t num_rows() const { return impl_->num_rows(); }
+
+ struct Impl {
+ virtual ~Impl() = default;
+ virtual Status Parse(const std::shared_ptr<Buffer>& json) = 0;
+ virtual Status Finish(std::shared_ptr<Array>* parsed) = 0;
+ virtual int32_t num_rows() = 0;
+ };
+
+ protected:
+ ARROW_DISALLOW_COPY_AND_ASSIGN(BlockParser);
+
+ MemoryPool* pool_;
+ const ParseOptions options_;
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace json
+} // namespace arrow
+
+#endif // ARROW_JSON_PARSER_H
diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc
new file mode 100644
index 0000000..0e147ab
--- /dev/null
+++ b/cpp/src/arrow/json/reader.cc
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/json/reader.h"
+
+#include <unordered_map>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/parsing.h"
+
+namespace arrow {
+namespace json {
+
+using internal::StringConverter;
+
+Kind::type KindFromTag(const std::shared_ptr<const KeyValueMetadata>& tag) {
+ std::string kind_name = tag->value(0);
+ switch (kind_name[0]) {
+ case 'n':
+ if (kind_name[2] == 'l') {
+ return Kind::kNull;
+ } else {
+ return Kind::kNumber;
+ }
+ case 'b':
+ return Kind::kBoolean;
+ case 's':
+ return Kind::kString;
+ case 'o':
+ return Kind::kObject;
+ case 'a':
+ return Kind::kArray;
+ default:
+ ARROW_LOG(FATAL);
+ return Kind::kNull;
+ }
+}
+
+static Status Convert(const std::shared_ptr<DataType>& out_type,
+ std::shared_ptr<Array> in, std::shared_ptr<Array>* out);
+
+struct ConvertImpl {
+ Status Visit(const NullType&) {
+ *out = in;
+ return Status::OK();
+ }
+ Status Visit(const BooleanType&) {
+ *out = in;
+ return Status::OK();
+ }
+ // handle conversion to types with StringConverter
+ template <typename T>
+ Status ConvertEachWith(const T& t, StringConverter<T>& convert_one) {
+ auto dict_array = static_cast<const DictionaryArray*>(in.get());
+ const StringArray& dict = static_cast<const StringArray&>(*dict_array->dictionary());
+ const Int32Array& indices = static_cast<const Int32Array&>(*dict_array->indices());
+ using Builder = typename TypeTraits<T>::BuilderType;
+ Builder builder(out_type, default_memory_pool());
+ RETURN_NOT_OK(builder.Resize(indices.length()));
+ for (int64_t i = 0; i != indices.length(); ++i) {
+ if (indices.IsNull(i)) {
+ builder.UnsafeAppendNull();
+ continue;
+ }
+ auto repr = dict.GetView(indices.GetView(i));
+ typename StringConverter<T>::value_type value;
+ if (!convert_one(repr.data(), repr.size(), &value)) {
+ return Status::Invalid("Failed of conversion of JSON to ", t, ":", repr);
+ }
+ builder.UnsafeAppend(value);
+ }
+ return builder.Finish(out);
+ }
+ template <typename T>
+ Status Visit(const T& t, decltype(StringConverter<T>())* = nullptr) {
+ StringConverter<T> convert_one;
+ return ConvertEachWith(t, convert_one);
+ }
+ // handle conversion to Timestamp
+ Status Visit(const TimestampType& t) {
+ StringConverter<TimestampType> convert_one(out_type);
+ return ConvertEachWith(t, convert_one);
+ }
+ Status VisitAs(const std::shared_ptr<DataType>& repr_type) {
+ std::shared_ptr<Array> repr_array;
+ RETURN_NOT_OK(Convert(repr_type, in, &repr_array));
+ auto data = repr_array->data();
+ data->type = out_type;
+ *out = MakeArray(data);
+ return Status::OK();
+ }
+ // handle half explicitly
+ Status Visit(const HalfFloatType&) { return VisitAs(float32()); }
+ // handle types represented as integers
+ template <typename T>
+ Status Visit(
+ const T& t,
+ typename std::enable_if<std::is_base_of<TimeType, T>::value ||
+ std::is_base_of<DateType, T>::value>::type* = nullptr) {
+ return VisitAs(std::is_same<typename T::c_type, int64_t>::value ? int64() : int32());
+ }
+ // handle binary and string
+ template <typename T>
+ Status Visit(
+ const T& t,
+ typename std::enable_if<std::is_base_of<BinaryType, T>::value>::type* = nullptr) {
+ auto dict_array = static_cast<const DictionaryArray*>(in.get());
+ const StringArray& dict = static_cast<const StringArray&>(*dict_array->dictionary());
+ const Int32Array& indices = static_cast<const Int32Array&>(*dict_array->indices());
+ using Builder = typename TypeTraits<T>::BuilderType;
+ Builder builder(out_type, default_memory_pool());
+ RETURN_NOT_OK(builder.Resize(indices.length()));
+ int64_t values_length = 0;
+ for (int64_t i = 0; i != indices.length(); ++i) {
+ if (indices.IsNull(i)) {
+ continue;
+ }
+ values_length += dict.GetView(indices.GetView(i)).size();
+ }
+ RETURN_NOT_OK(builder.ReserveData(values_length));
+ for (int64_t i = 0; i != indices.length(); ++i) {
+ if (indices.IsNull(i)) {
+ builder.UnsafeAppendNull();
+ continue;
+ }
+ auto value = dict.GetView(indices.GetView(i));
+ builder.UnsafeAppend(value);
+ }
+ return builder.Finish(out);
+ }
+ Status Visit(const ListType& t) {
+ auto list_array = static_cast<const ListArray*>(in.get());
+ std::shared_ptr<Array> values;
+ auto value_type = t.value_type();
+ RETURN_NOT_OK(Convert(value_type, list_array->values(), &values));
+ auto data = ArrayData::Make(out_type, in->length(),
+ {in->null_bitmap(), list_array->value_offsets()},
+ {values->data()}, in->null_count());
+ *out = MakeArray(data);
+ return Status::OK();
+ }
+ Status Visit(const StructType& t) {
+ auto struct_array = static_cast<const StructArray*>(in.get());
+ std::vector<std::shared_ptr<ArrayData>> child_data(t.num_children());
+ for (int i = 0; i != t.num_children(); ++i) {
+ std::shared_ptr<Array> child;
+ RETURN_NOT_OK(Convert(t.child(i)->type(), struct_array->field(i), &child));
+ child_data[i] = child->data();
+ }
+ auto data = ArrayData::Make(out_type, in->length(), {in->null_bitmap()},
+ std::move(child_data), in->null_count());
+ *out = MakeArray(data);
+ return Status::OK();
+ }
+ Status Visit(const DataType& not_impl) {
+ return Status::NotImplemented("JSON parsing of ", not_impl);
+ }
+ std::shared_ptr<DataType> out_type;
+ std::shared_ptr<Array> in;
+ std::shared_ptr<Array>* out;
+};
+
+static Status Convert(const std::shared_ptr<DataType>& out_type,
+ std::shared_ptr<Array> in, std::shared_ptr<Array>* out) {
+ ConvertImpl visitor = {out_type, in, out};
+ return VisitTypeInline(*out_type, &visitor);
+}
+
+static Status InferAndConvert(std::shared_ptr<DataType> expected,
+ const std::shared_ptr<const KeyValueMetadata>& tag,
+ const std::shared_ptr<Array>& in,
+ std::shared_ptr<Array>* out) {
+ Kind::type kind = KindFromTag(tag);
+ switch (kind) {
+ case Kind::kObject: {
+ // FIXME(bkietz) in general expected fields may not be an exact prefix of parsed's
+ auto in_type = static_cast<StructType*>(in->type().get());
+ if (expected == nullptr) {
+ expected = struct_({});
+ }
+ auto expected_type = static_cast<StructType*>(expected.get());
+ if (in_type->num_children() == expected_type->num_children()) {
+ return Convert(expected, in, out);
+ }
+
+ auto fields = expected_type->children();
+ fields.resize(in_type->num_children());
+ std::vector<std::shared_ptr<ArrayData>> child_data(in_type->num_children());
+
+ for (int i = 0; i != in_type->num_children(); ++i) {
+ std::shared_ptr<DataType> expected_field_type;
+ if (i < expected_type->num_children()) {
+ expected_field_type = expected_type->child(i)->type();
+ }
+ auto in_field = in_type->child(i);
+ auto in_column = static_cast<StructArray*>(in.get())->field(i);
+ std::shared_ptr<Array> column;
+ RETURN_NOT_OK(InferAndConvert(expected_field_type, in_field->metadata(),
+ in_column, &column));
+ fields[i] = field(in_field->name(), column->type());
+ child_data[i] = column->data();
+ }
+ auto data =
+ ArrayData::Make(struct_(std::move(fields)), in->length(), {in->null_bitmap()},
+ std::move(child_data), in->null_count());
+ *out = MakeArray(data);
+ return Status::OK();
+ }
+ case Kind::kArray: {
+ auto list_array = static_cast<const ListArray*>(in.get());
+ auto value_tag = list_array->list_type()->value_field()->metadata();
+ std::shared_ptr<Array> values;
+ if (expected != nullptr) {
+ RETURN_NOT_OK(InferAndConvert(expected->child(0)->type(), value_tag,
+ list_array->values(), &values));
+ } else {
+ RETURN_NOT_OK(InferAndConvert(nullptr, value_tag, list_array->values(), &values));
+ }
+ auto data = ArrayData::Make(list(values->type()), in->length(),
+ {in->null_bitmap(), list_array->value_offsets()},
+ {values->data()}, in->null_count());
+ *out = MakeArray(data);
+ return Status::OK();
+ }
+ default:
+ // an expected type overrides inferrence for scalars
+ // (but not nested types, which may have unexpected fields)
+ if (expected != nullptr) {
+ return Convert(expected, in, out);
+ }
+ }
+ switch (kind) {
+ case Kind::kNull:
+ return Convert(null(), in, out);
+ case Kind::kBoolean:
+ return Convert(boolean(), in, out);
+ case Kind::kNumber:
+ // attempt conversion to Int64 first
+ if (Convert(int64(), in, out).ok()) {
+ return Status::OK();
+ }
+ return Convert(float64(), in, out);
+ case Kind::kString: // attempt conversion to Timestamp first
+ if (Convert(timestamp(TimeUnit::SECOND), in, out).ok()) {
+ return Status::OK();
+ }
+ return Convert(utf8(), in, out);
+ default:
+ return Status::Invalid("invalid JSON kind");
+ }
+}
+
+Status ParseOne(ParseOptions options, std::shared_ptr<Buffer> json,
+ std::shared_ptr<RecordBatch>* out) {
+ BlockParser parser(default_memory_pool(), options, json);
+ RETURN_NOT_OK(parser.Parse(json));
+ std::shared_ptr<Array> parsed;
+ RETURN_NOT_OK(parser.Finish(&parsed));
+ std::shared_ptr<Array> converted;
+ auto schm = options.explicit_schema;
+ if (options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+ if (schm) {
+ RETURN_NOT_OK(InferAndConvert(struct_(schm->fields()), Tag(Kind::kObject), parsed,
+ &converted));
+ } else {
+ RETURN_NOT_OK(InferAndConvert(nullptr, Tag(Kind::kObject), parsed, &converted));
+ }
+ schm = schema(converted->type()->children());
+ } else {
+ RETURN_NOT_OK(Convert(struct_(schm->fields()), parsed, &converted));
+ }
+ std::vector<std::shared_ptr<Array>> columns(parsed->num_fields());
+ for (int i = 0; i != parsed->num_fields(); ++i) {
+ columns[i] = static_cast<StructArray*>(converted.get())->field(i);
+ }
+ *out = RecordBatch::Make(schm, parsed->length(), std::move(columns));
+ return Status::OK();
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/cpp/src/arrow/json/reader.h b/cpp/src/arrow/json/reader.h
new file mode 100644
index 0000000..6463793
--- /dev/null
+++ b/cpp/src/arrow/json/reader.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_JSON_READER_H
+#define ARROW_JSON_READER_H
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/json/options.h" // IWYU pragma: keep
+#include "arrow/json/parser.h" // IWYU pragma: keep
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class MemoryPool;
+class Table;
+
+namespace io {
+class InputStream;
+} // namespace io
+
+namespace json {
+
+class ARROW_EXPORT TableReader {
+ public:
+ virtual ~TableReader() = default;
+
+ virtual Status Read(std::shared_ptr<Table>* out) = 0;
+
+ // XXX pass optional schema?
+ static Status Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
+ const ReadOptions&, const ParseOptions&,
+ std::shared_ptr<TableReader>* out);
+};
+
+ARROW_EXPORT Status ParseOne(ParseOptions options, std::shared_ptr<Buffer> json,
+ std::shared_ptr<RecordBatch>* out);
+
+} // namespace json
+} // namespace arrow
+
+#endif // ARROW_JSON_READER_H
diff --git a/cpp/src/arrow/json/test-common.h b/cpp/src/arrow/json/test-common.h
new file mode 100644
index 0000000..0e7f6e3
--- /dev/null
+++ b/cpp/src/arrow/json/test-common.h
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include <rapidjson/writer.h>
+
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/string_view.h"
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+namespace json {
+
+using rapidjson::StringBuffer;
+using Writer = rapidjson::Writer<StringBuffer>;
+
+inline static Status OK(bool ok) { return ok ? Status::OK() : Status::Invalid(""); }
+
+template <typename Engine>
+static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer* writer);
+
+template <typename Engine>
+static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine& e,
+ Writer* writer);
+
+template <typename Engine>
+static Status Generate(const std::shared_ptr<Schema>& schm, Engine& e, Writer* writer) {
+ return Generate(schm->fields(), e, writer);
+}
+
+template <typename Engine>
+struct GenerateImpl {
+ Status Visit(const BooleanType&) {
+ return OK(writer.Bool(std::uniform_int_distribution<uint16_t>{}(e)&1));
+ }
+ template <typename T>
+ Status Visit(T const&, enable_if_unsigned_integer<T>* = nullptr) {
+ auto val = std::uniform_int_distribution<>{}(e);
+ return OK(writer.Uint64(static_cast<typename T::c_type>(val)));
+ }
+ template <typename T>
+ Status Visit(T const&, enable_if_signed_integer<T>* = nullptr) {
+ auto val = std::uniform_int_distribution<>{}(e);
+ return OK(writer.Int64(static_cast<typename T::c_type>(val)));
+ }
+ template <typename T>
+ Status Visit(T const&, enable_if_floating_point<T>* = nullptr) {
+ auto val = std::normal_distribution<typename T::c_type>{0, 1 << 10}(e);
+ return OK(writer.Double(val));
+ }
+ Status Visit(HalfFloatType const&) {
+ auto val = std::normal_distribution<double>{0, 1 << 10}(e);
+ return OK(writer.Double(val));
+ }
+ template <typename T>
+ Status Visit(T const&, enable_if_binary<T>* = nullptr) {
+ auto size = std::poisson_distribution<>{4}(e);
+ std::uniform_int_distribution<uint16_t> gen_char(32, 127); // FIXME generate UTF8
+ std::string s(size, '\0');
+ for (char& ch : s) ch = static_cast<char>(gen_char(e));
+ return OK(writer.String(s.c_str()));
+ }
+ template <typename T>
+ Status Visit(
+ T const& t, typename std::enable_if<!is_number<T>::value>::type* = nullptr,
+ typename std::enable_if<!std::is_base_of<BinaryType, T>::value>::type* = nullptr) {
+ return Status::Invalid("can't generate a value of type " + t.name());
+ }
+ Status Visit(const ListType& t) {
+ auto size = std::poisson_distribution<>{4}(e);
+ writer.StartArray();
+ for (int i = 0; i != size; ++i) RETURN_NOT_OK(Generate(t.value_type(), e, &writer));
+ return OK(writer.EndArray(size));
+ }
+ Status Visit(const StructType& t) { return Generate(t.children(), e, &writer); }
+ Engine& e;
+ rapidjson::Writer<rapidjson::StringBuffer>& writer;
+};
+
+template <typename Engine>
+static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer* writer) {
+ if (std::uniform_real_distribution<>{0, 1}(e) < .2) {
+ // one out of 5 chance of null, anywhere
+ writer->Null();
+ return Status::OK();
+ }
+ GenerateImpl<Engine> visitor = {e, *writer};
+ return VisitTypeInline(*type, &visitor);
+}
+
+template <typename Engine>
+static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine& e,
+ Writer* writer) {
+ RETURN_NOT_OK(OK(writer->StartObject()));
+ for (const auto& f : fields) {
+ writer->Key(f->name().c_str());
+ RETURN_NOT_OK(Generate(f->type(), e, writer));
+ }
+ return OK(writer->EndObject(static_cast<int>(fields.size())));
+}
+
+Status MakeBuffer(util::string_view data, std::shared_ptr<Buffer>* out) {
+ RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), data.size(), out));
+ std::copy(std::begin(data), std::end(data), (*out)->mutable_data());
+ return Status::OK();
+}
+
+// scalar values (numbers and strings) are parsed into a
+// dictionary<index:int32, value:string>. This can be decoded for ease of comparison
+Status DecodeStringDictionary(const DictionaryArray& dict_array,
+ std::shared_ptr<Array>* decoded) {
+ const StringArray& dict = static_cast<const StringArray&>(*dict_array.dictionary());
+ const Int32Array& indices = static_cast<const Int32Array&>(*dict_array.indices());
+ StringBuilder builder;
+ RETURN_NOT_OK(builder.Resize(indices.length()));
+ for (int64_t i = 0; i != indices.length(); ++i) {
+ if (indices.IsNull(i)) {
+ builder.UnsafeAppendNull();
+ continue;
+ }
+ auto value = dict.GetView(indices.GetView(i));
+ RETURN_NOT_OK(builder.ReserveData(value.size()));
+ builder.UnsafeAppend(value);
+ }
+ return builder.Finish(decoded);
+}
+
+} // namespace json
+} // namespace arrow
diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h
index ceb6885..abae413 100644
--- a/cpp/src/arrow/record_batch.h
+++ b/cpp/src/arrow/record_batch.h
@@ -92,6 +92,14 @@ class ARROW_EXPORT RecordBatch {
/// \return an Array object
virtual std::shared_ptr<Array> column(int i) const = 0;
+ /// \brief Retrieve an array from the record batch
+ /// \param[in] name field name
+ /// \return an Array or null if no field was found
+ std::shared_ptr<Array> GetColumnByName(const std::string& name) const {
+ auto i = schema_->GetFieldIndex(name);
+ return i == -1 ? NULLPTR : column(i);
+ }
+
/// \brief Retrieve an array's internaldata from the record batch
/// \param[in] i field index, does not boundscheck
/// \return an internal ArrayData object
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 2ac34b4..31c9510 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -248,6 +248,14 @@ class ARROW_EXPORT Table {
/// Return a column by index
virtual std::shared_ptr<Column> column(int i) const = 0;
+ /// \brief Return a column by name
+ /// \param[in] name field name
+ /// \return an Array or null if no field was found
+ std::shared_ptr<Column> GetColumnByName(const std::string& name) const {
+ auto i = schema_->GetFieldIndex(name);
+ return i == -1 ? NULLPTR : column(i);
+ }
+
/// \brief Remove column from the table, producing a new Table
virtual Status RemoveColumn(int i, std::shared_ptr<Table>* out) const = 0;
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 2024899..852ddb0 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -381,11 +381,11 @@ bool Schema::Equals(const Schema& other, bool check_metadata) const {
}
std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) const {
- int64_t i = GetFieldIndex(name);
+ int i = GetFieldIndex(name);
return i == -1 ? nullptr : fields_[i];
}
-int64_t Schema::GetFieldIndex(const std::string& name) const {
+int Schema::GetFieldIndex(const std::string& name) const {
auto it = name_to_index_.find(name);
if (it == name_to_index_.end()) {
return -1;
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 4f61f2d..c775b11 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -827,7 +827,7 @@ class ARROW_EXPORT Schema {
std::shared_ptr<Field> GetFieldByName(const std::string& name) const;
/// Returns -1 if name not found
- int64_t GetFieldIndex(const std::string& name) const;
+ int GetFieldIndex(const std::string& name) const;
const std::vector<std::shared_ptr<Field>>& fields() const { return fields_; }
diff --git a/cpp/src/arrow/util/stl.h b/cpp/src/arrow/util/stl.h
index 163ed40..4889814 100644
--- a/cpp/src/arrow/util/stl.h
+++ b/cpp/src/arrow/util/stl.h
@@ -18,6 +18,9 @@
#ifndef ARROW_UTIL_STL_H
#define ARROW_UTIL_STL_H
+#include <memory>
+#include <type_traits>
+#include <utility>
#include <vector>
#include "arrow/util/logging.h"
@@ -25,6 +28,20 @@
namespace arrow {
namespace internal {
+template <typename T, typename... A>
+typename std::enable_if<!std::is_array<T>::value, std::unique_ptr<T>>::type make_unique(
+ A&&... args) {
+ return std::unique_ptr<T>(new T(std::forward<A>(args)...));
+}
+
+template <typename T>
+typename std::enable_if<std::is_array<T>::value && std::extent<T>::value == 0,
+ std::unique_ptr<T>>::type
+make_unique(std::size_t n) {
+ using value_type = typename std::remove_extent<T>::type;
+ return std::unique_ptr<value_type[]>(new value_type[n]);
+}
+
template <typename T>
inline std::vector<T> DeleteVectorElement(const std::vector<T>& values, size_t index) {
DCHECK(!values.empty());