You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/04/17 14:47:40 UTC
[arrow] branch master updated: ARROW-4708: [C++] refactoring JSON
parser to prepare for multithreaded impl
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new b496913 ARROW-4708: [C++] refactoring JSON parser to prepare for multithreaded impl
b496913 is described below
commit b496913e5ddb7d8bbef65a59be5a54ac14bf6740
Author: Benjamin Kietzman <be...@gmail.com>
AuthorDate: Wed Apr 17 16:47:31 2019 +0200
ARROW-4708: [C++] refactoring JSON parser to prepare for multithreaded impl
- don't use in-situ parsing
- remove UnsafeStringBuilder (which was only useful when parsing in-situ)
- don't require null termination of parsed buffers
- resize parser's scalar storage when it might overflow
- rewrite chunker to use Buffer instead of string_view
to represent memory ranges
- iwyu + lint cleanup
- add test for parsing JSON with a partial schema
- add test for inferring timestamp type in an unexpected
field of strings
- refactor rapidjson defines into a single header
- correct SSE detection
- disable MSVC conversion errors (to match GCC and Clang options)
- allow ArrayFromJSON to parse timestamps from strings
Author: Benjamin Kietzman <be...@gmail.com>
Closes #4148 from bkietz/4708-Add-multithreaded-JSON-reader and squashes the following commits:
52edad826 <Benjamin Kietzman> include arrow/utils/macros.h for ARROW_BITNESS
d3503250f <Benjamin Kietzman> #include sse-utils in rapidjson-def for sse macros
aa613438a <Benjamin Kietzman> correct ConsumeWhitespace for non-SIMD case
e62ce02a6 <Benjamin Kietzman> use arrow sse macros
673ae342e <Benjamin Kietzman> add init for error case to make MSVC happy
e005a89c3 <Benjamin Kietzman> correct SSE detection
11ba680ec <Benjamin Kietzman> refactor rapidjson defines to a single header
cb638a6f5 <Benjamin Kietzman> address review comments
ee8c5e5d4 <Benjamin Kietzman> functions in source files should be static
a8fd17268 <Benjamin Kietzman> disable conversion warnings for MSVC
866af6d54 <Benjamin Kietzman> add iwyu includes, use pragma once instead of guards
2c990bf1f <Benjamin Kietzman> remove redundant KindFromTag()
058607bca <Benjamin Kietzman> fix build error
ab27659a5 <Benjamin Kietzman> refactoring JSON parser to prepare for multithreaded impl
---
cpp/cmake_modules/SetupCxxFlags.cmake | 4 +-
cpp/src/arrow/buffer.h | 29 ++-
cpp/src/arrow/ipc/json-internal.h | 13 +-
cpp/src/arrow/ipc/json-simple-test.cc | 13 +
cpp/src/arrow/ipc/json-simple.cc | 167 ++++++++-----
cpp/src/arrow/json/api.h | 5 +-
cpp/src/arrow/json/chunker-test.cc | 138 +++++------
cpp/src/arrow/json/chunker.cc | 143 ++++++-----
cpp/src/arrow/json/chunker.h | 31 ++-
cpp/src/arrow/json/options.h | 9 +-
cpp/src/arrow/json/parser-benchmark.cc | 74 +++++-
cpp/src/arrow/json/parser-test.cc | 93 ++------
cpp/src/arrow/json/parser.cc | 314 +++++++++++++------------
cpp/src/arrow/json/parser.h | 97 +++-----
cpp/src/arrow/json/{api.h => rapidjson-defs.h} | 30 ++-
cpp/src/arrow/json/reader.cc | 51 ++--
cpp/src/arrow/json/reader.h | 22 +-
cpp/src/arrow/json/test-common.h | 71 ++++--
cpp/src/arrow/type_traits.h | 81 ++++---
cpp/src/arrow/util/parsing.h | 40 +++-
cpp/src/arrow/util/sse-util.h | 14 +-
21 files changed, 780 insertions(+), 659 deletions(-)
diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index 3ce2dc8..8cb2585 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -138,8 +138,8 @@ endmacro()
if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
# Pre-checkin builds
if("${COMPILER_FAMILY}" STREQUAL "msvc")
- string(REPLACE "/W3" "" CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS}")
- set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /W3")
+ # https://docs.microsoft.com/en-us/cpp/error-messages/compiler-warnings/compiler-warnings-by-compiler-version
+ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /W3 /wd4365 /wd4267 /wd4838")
elseif("${COMPILER_FAMILY}" STREQUAL "clang")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Weverything -Wno-c++98-compat \
-Wno-c++98-compat-pedantic -Wno-deprecated -Wno-weak-vtables -Wno-padded \
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index 1f1cd4b..20a7969 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -29,6 +29,7 @@
#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/util/macros.h"
+#include "arrow/util/string_view.h"
#include "arrow/util/visibility.h"
namespace arrow {
@@ -62,14 +63,14 @@ class ARROW_EXPORT Buffer {
size_(size),
capacity_(size) {}
- /// \brief Construct from std::string without copying memory
+ /// \brief Construct from string_view without copying memory
///
- /// \param[in] data a std::string object
+ /// \param[in] data a string_view object
///
- /// \note The std::string must stay alive for the lifetime of the Buffer, so
- /// temporary rvalue strings must be stored in an lvalue somewhere
- explicit Buffer(const std::string& data)
- : Buffer(reinterpret_cast<const uint8_t*>(data.c_str()),
+ /// \note The memory viewed by data must not be deallocated in the lifetime of the
+ /// Buffer; temporary rvalue strings must be stored in an lvalue somewhere
+ explicit Buffer(util::string_view data)
+ : Buffer(reinterpret_cast<const uint8_t*>(data.data()),
static_cast<int64_t>(data.size())) {}
virtual ~Buffer() = default;
@@ -161,6 +162,12 @@ class ARROW_EXPORT Buffer {
/// \note Can throw std::bad_alloc if buffer is large
std::string ToString() const;
+ /// \brief View buffer contents as a util::string_view
+ /// \return util::string_view
+ explicit operator util::string_view() const {
+ return util::string_view(reinterpret_cast<const char*>(data_), size_);
+ }
+
/// \brief Return a pointer to the buffer's data
const uint8_t* data() const { return data_; }
/// \brief Return a writable pointer to the buffer's data
@@ -230,6 +237,16 @@ ARROW_EXPORT
std::shared_ptr<Buffer> SliceMutableBuffer(const std::shared_ptr<Buffer>& buffer,
const int64_t offset, const int64_t length);
+/// \brief Like SliceBuffer, but construct a mutable buffer slice.
+///
+/// If the parent buffer is not mutable, behavior is undefined (it may abort
+/// in debug builds).
+static inline std::shared_ptr<Buffer> SliceMutableBuffer(
+ const std::shared_ptr<Buffer>& buffer, const int64_t offset) {
+ int64_t length = buffer->size() - offset;
+ return SliceMutableBuffer(buffer, offset, length);
+}
+
/// @}
/// \class MutableBuffer
diff --git a/cpp/src/arrow/ipc/json-internal.h b/cpp/src/arrow/ipc/json-internal.h
index c8c7249..b69c8bb 100644
--- a/cpp/src/arrow/ipc/json-internal.h
+++ b/cpp/src/arrow/ipc/json-internal.h
@@ -18,22 +18,11 @@
#ifndef ARROW_IPC_JSON_INTERNAL_H
#define ARROW_IPC_JSON_INTERNAL_H
-#define RAPIDJSON_HAS_STDSTRING 1
-#define RAPIDJSON_HAS_CXX11_RVALUE_REFS 1
-#define RAPIDJSON_HAS_CXX11_RANGE_FOR 1
-
-#define RAPIDJSON_NAMESPACE arrow::rapidjson
-#define RAPIDJSON_NAMESPACE_BEGIN \
- namespace arrow { \
- namespace rapidjson {
-#define RAPIDJSON_NAMESPACE_END \
- } \
- }
-
#include <memory>
#include <sstream>
#include <string>
+#include "arrow/json/rapidjson-defs.h"
#include "rapidjson/document.h" // IWYU pragma: export
#include "rapidjson/encodings.h" // IWYU pragma: export
#include "rapidjson/error/en.h" // IWYU pragma: export
diff --git a/cpp/src/arrow/ipc/json-simple-test.cc b/cpp/src/arrow/ipc/json-simple-test.cc
index 1bb04a3..ad2f175 100644
--- a/cpp/src/arrow/ipc/json-simple-test.cc
+++ b/cpp/src/arrow/ipc/json-simple-test.cc
@@ -321,6 +321,19 @@ TEST(TestString, Basics) {
AssertJSONArray<BinaryType, std::string>(type, "[\"\\u0000\\u001f\"]", {s});
}
+TEST(TestTimestamp, Basics) {
+ // Timestamp type
+ auto type = timestamp(TimeUnit::SECOND);
+ AssertJSONArray<TimestampType, int64_t>(
+ type, R"(["1970-01-01","2000-02-29","3989-07-14","1900-02-28"])",
+ {0, 951782400, 63730281600LL, -2203977600LL});
+
+ type = timestamp(TimeUnit::NANO);
+ AssertJSONArray<TimestampType, int64_t>(
+ type, R"(["1970-01-01","2000-02-29","1900-02-28"])",
+ {0, 951782400000000000LL, -2203977600000000000LL});
+}
+
TEST(TestString, Errors) {
std::shared_ptr<DataType> type = utf8();
std::shared_ptr<Array> array;
diff --git a/cpp/src/arrow/ipc/json-simple.cc b/cpp/src/arrow/ipc/json-simple.cc
index 2861bd7..29da85e 100644
--- a/cpp/src/arrow/ipc/json-simple.cc
+++ b/cpp/src/arrow/ipc/json-simple.cc
@@ -29,6 +29,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/decimal.h"
#include "arrow/util/logging.h"
+#include "arrow/util/parsing.h"
#include "arrow/util/string_view.h"
namespace arrow {
@@ -91,8 +92,6 @@ class ConcreteConverter : public Converter {
}
};
-// TODO : dates and times?
-
// ------------------------------------------------------------------------
// Converter for null arrays
@@ -114,7 +113,7 @@ class NullConverter final : public ConcreteConverter<NullConverter> {
std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
- protected:
+ private:
std::shared_ptr<NullBuilder> builder_;
};
@@ -145,11 +144,67 @@ class BooleanConverter final : public ConcreteConverter<BooleanConverter> {
std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
- protected:
+ private:
std::shared_ptr<BooleanBuilder> builder_;
};
// ------------------------------------------------------------------------
+// Helpers for numeric converters
+
+// Convert single signed integer value (also {Date,Time}{32,64} and Timestamp)
+template <typename T>
+typename std::enable_if<is_signed_integer<T>::value || is_date<T>::value ||
+ is_time<T>::value || is_timestamp<T>::value,
+ Status>::type
+ConvertNumber(const rj::Value& json_obj, typename T::c_type* out) {
+ if (json_obj.IsInt64()) {
+ int64_t v64 = json_obj.GetInt64();
+ *out = static_cast<typename T::c_type>(v64);
+ if (*out == v64) {
+ return Status::OK();
+ } else {
+ return Status::Invalid("Value ", v64, " out of bounds for ",
+ TypeTraits<T>::type_singleton());
+ }
+ } else {
+ *out = static_cast<typename T::c_type>(0);
+ return JSONTypeError("signed int", json_obj.GetType());
+ }
+}
+
+// Convert single unsigned integer value
+template <typename T>
+enable_if_unsigned_integer<T, Status> ConvertNumber(const rj::Value& json_obj,
+ typename T::c_type* out) {
+ if (json_obj.IsUint64()) {
+ uint64_t v64 = json_obj.GetUint64();
+ *out = static_cast<typename T::c_type>(v64);
+ if (*out == v64) {
+ return Status::OK();
+ } else {
+ return Status::Invalid("Value ", v64, " out of bounds for ",
+ TypeTraits<T>::type_singleton());
+ }
+ } else {
+ *out = static_cast<typename T::c_type>(0);
+ return JSONTypeError("unsigned int", json_obj.GetType());
+ }
+}
+
+// Convert single floating point value
+template <typename T>
+enable_if_floating_point<T, Status> ConvertNumber(const rj::Value& json_obj,
+ typename T::c_type* out) {
+ if (json_obj.IsNumber()) {
+ *out = static_cast<typename T::c_type>(json_obj.GetDouble());
+ return Status::OK();
+ } else {
+ *out = static_cast<typename T::c_type>(0);
+ return JSONTypeError("number", json_obj.GetType());
+ }
+}
+
+// ------------------------------------------------------------------------
// Converter for int arrays
template <typename Type>
@@ -169,49 +224,14 @@ class IntegerConverter final : public ConcreteConverter<IntegerConverter<Type>>
if (json_obj.IsNull()) {
return AppendNull();
}
- return AppendNumber(json_obj);
+ c_type value;
+ RETURN_NOT_OK(ConvertNumber<Type>(json_obj, &value));
+ return builder_->Append(value);
}
std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
- protected:
- // Append signed integer value
- template <typename Integer = c_type>
- typename std::enable_if<std::is_signed<Integer>::value, Status>::type AppendNumber(
- const rj::Value& json_obj) {
- if (json_obj.IsInt64()) {
- int64_t v64 = json_obj.GetInt64();
- c_type v = static_cast<c_type>(v64);
- if (v == v64) {
- return builder_->Append(v);
- } else {
- return Status::Invalid("Value ", v64, " out of bounds for ",
- this->type_->ToString());
- }
- } else {
- return JSONTypeError("signed int", json_obj.GetType());
- }
- }
-
- // Append unsigned integer value
- template <typename Integer = c_type>
- typename std::enable_if<std::is_unsigned<Integer>::value, Status>::type AppendNumber(
- const rj::Value& json_obj) {
- if (json_obj.IsUint64()) {
- uint64_t v64 = json_obj.GetUint64();
- c_type v = static_cast<c_type>(v64);
- if (v == v64) {
- return builder_->Append(v);
- } else {
- return Status::Invalid("Value ", v64, " out of bounds for ",
- this->type_->ToString());
- }
- return builder_->Append(v);
- } else {
- return JSONTypeError("unsigned int", json_obj.GetType());
- }
- }
-
+ private:
std::shared_ptr<NumericBuilder<Type>> builder_;
};
@@ -234,17 +254,14 @@ class FloatConverter final : public ConcreteConverter<FloatConverter<Type>> {
if (json_obj.IsNull()) {
return AppendNull();
}
- if (json_obj.IsNumber()) {
- c_type v = static_cast<c_type>(json_obj.GetDouble());
- return builder_->Append(v);
- } else {
- return JSONTypeError("number", json_obj.GetType());
- }
+ c_type value;
+ RETURN_NOT_OK(ConvertNumber<Type>(json_obj, &value));
+ return builder_->Append(value);
}
std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
- protected:
+ private:
std::shared_ptr<NumericBuilder<Type>> builder_;
};
@@ -281,12 +298,50 @@ class DecimalConverter final : public ConcreteConverter<DecimalConverter> {
std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
- protected:
+ private:
std::shared_ptr<DecimalBuilder> builder_;
Decimal128Type* decimal_type_;
};
// ------------------------------------------------------------------------
+// Converter for timestamp arrays
+
+class TimestampConverter final : public ConcreteConverter<TimestampConverter> {
+ public:
+ explicit TimestampConverter(const std::shared_ptr<DataType>& type)
+ : from_string_(type) {
+ this->type_ = type;
+ builder_ = std::make_shared<TimestampBuilder>(type, default_memory_pool());
+ }
+
+ Status AppendNull() override { return builder_->AppendNull(); }
+
+ Status AppendValue(const rj::Value& json_obj) override {
+ if (json_obj.IsNull()) {
+ return AppendNull();
+ }
+ int64_t value;
+ if (json_obj.IsNumber()) {
+ RETURN_NOT_OK(ConvertNumber<Int64Type>(json_obj, &value));
+ } else if (json_obj.IsString()) {
+ auto view = util::string_view(json_obj.GetString(), json_obj.GetStringLength());
+ if (!from_string_(view.data(), view.size(), &value)) {
+ return Status::Invalid("couldn't parse timestamp from ", view);
+ }
+ } else {
+ return JSONTypeError("timestamp", json_obj.GetType());
+ }
+ return builder_->Append(value);
+ }
+
+ std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
+
+ private:
+ ::arrow::internal::StringConverter<TimestampType> from_string_;
+ std::shared_ptr<TimestampBuilder> builder_;
+};
+
+// ------------------------------------------------------------------------
// Converter for binary and string arrays
class StringConverter final : public ConcreteConverter<StringConverter> {
@@ -312,7 +367,7 @@ class StringConverter final : public ConcreteConverter<StringConverter> {
std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
- protected:
+ private:
std::shared_ptr<BinaryBuilder> builder_;
};
@@ -349,7 +404,7 @@ class FixedSizeBinaryConverter final
std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
- protected:
+ private:
std::shared_ptr<FixedSizeBinaryBuilder> builder_;
};
@@ -381,7 +436,7 @@ class ListConverter final : public ConcreteConverter<ListConverter> {
std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
- protected:
+ private:
std::shared_ptr<ListBuilder> builder_;
std::shared_ptr<Converter> child_converter_;
};
@@ -456,7 +511,7 @@ class StructConverter final : public ConcreteConverter<StructConverter> {
std::shared_ptr<ArrayBuilder> builder() override { return builder_; }
- protected:
+ private:
std::shared_ptr<StructBuilder> builder_;
std::vector<std::shared_ptr<Converter>> child_converters_;
};
@@ -481,7 +536,7 @@ Status GetConverter(const std::shared_ptr<DataType>& type,
SIMPLE_CONVERTER_CASE(Type::DATE32, IntegerConverter<Date32Type>)
SIMPLE_CONVERTER_CASE(Type::INT64, IntegerConverter<Int64Type>)
SIMPLE_CONVERTER_CASE(Type::TIME64, IntegerConverter<Int64Type>)
- SIMPLE_CONVERTER_CASE(Type::TIMESTAMP, IntegerConverter<Int64Type>)
+ SIMPLE_CONVERTER_CASE(Type::TIMESTAMP, TimestampConverter)
SIMPLE_CONVERTER_CASE(Type::DATE64, IntegerConverter<Date64Type>)
SIMPLE_CONVERTER_CASE(Type::UINT8, IntegerConverter<UInt8Type>)
SIMPLE_CONVERTER_CASE(Type::UINT16, IntegerConverter<UInt16Type>)
diff --git a/cpp/src/arrow/json/api.h b/cpp/src/arrow/json/api.h
index 00fbc2e..47b5668 100644
--- a/cpp/src/arrow/json/api.h
+++ b/cpp/src/arrow/json/api.h
@@ -15,10 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef ARROW_JSON_API_H
-#define ARROW_JSON_API_H
+#pragma once
#include "arrow/json/options.h"
#include "arrow/json/reader.h"
-
-#endif // ARROW_JSON_API_H
diff --git a/cpp/src/arrow/json/chunker-test.cc b/cpp/src/arrow/json/chunker-test.cc
index 88fbc31..f2d9cc2 100644
--- a/cpp/src/arrow/json/chunker-test.cc
+++ b/cpp/src/arrow/json/chunker-test.cc
@@ -15,20 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-#include <cstdint>
#include <memory>
#include <numeric>
#include <string>
#include <gtest/gtest.h>
-#include <rapidjson/document.h>
-#include <rapidjson/prettywriter.h>
-#include <rapidjson/reader.h>
+#include "arrow/buffer.h"
#include "arrow/json/chunker.h"
-#include "arrow/json/options.h"
-#include "arrow/testing/gtest_common.h"
-#include "arrow/testing/util.h"
+#include "arrow/json/test-common.h"
+#include "arrow/testing/gtest_util.h"
#include "arrow/util/string_view.h"
namespace arrow {
@@ -41,89 +37,88 @@ namespace json {
using util::string_view;
template <typename Lines>
-std::string join(Lines&& lines, std::string delimiter) {
- std::string joined;
+static std::shared_ptr<Buffer> join(Lines&& lines, std::string delimiter) {
+ std::shared_ptr<Buffer> joined;
+ BufferVector line_buffers;
+ auto delimiter_buffer = std::make_shared<Buffer>(delimiter);
for (const auto& line : lines) {
- joined += line + delimiter;
+ line_buffers.push_back(std::make_shared<Buffer>(line));
+ line_buffers.push_back(delimiter_buffer);
}
+ ABORT_NOT_OK(ConcatenateBuffers(line_buffers, default_memory_pool(), &joined));
return joined;
}
-std::string PrettyPrint(string_view one_line) {
- rapidjson::Document document;
- document.Parse(one_line.data());
- rapidjson::StringBuffer sb;
- rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb);
- document.Accept(writer);
- return sb.GetString();
+static bool WhitespaceOnly(string_view s) {
+ return s.find_first_not_of(" \t\r\n") == string_view::npos;
}
-bool WhitespaceOnly(string_view s) {
- return s.find_first_not_of(" \t\r\n") == string_view::npos;
+static bool WhitespaceOnly(const std::shared_ptr<Buffer>& b) {
+ return WhitespaceOnly(string_view(*b));
}
-std::size_t ConsumeWholeObject(string_view* str) {
- auto fail = [str] {
- *str = string_view();
+static std::size_t ConsumeWholeObject(std::shared_ptr<Buffer>* buf) {
+ auto str = string_view(**buf);
+ auto fail = [buf] {
+ *buf = nullptr;
return string_view::npos;
};
- if (WhitespaceOnly(*str)) return fail();
- auto open_brace = str->find_first_not_of(" \t\r\n");
- if (str->at(open_brace) != '{') return fail();
- auto close_brace = str->find_first_of("}");
+ if (WhitespaceOnly(str)) return fail();
+ auto open_brace = str.find_first_not_of(" \t\r\n");
+ if (str.at(open_brace) != '{') return fail();
+ auto close_brace = str.find_first_of("}");
if (close_brace == string_view::npos) return fail();
- if (str->at(close_brace) != '}') return fail();
auto length = close_brace + 1;
- *str = str->substr(length);
+ *buf = SliceBuffer(*buf, length);
return length;
}
-void AssertWholeObjects(Chunker& chunker, string_view block, int expected_count) {
- string_view whole;
- ASSERT_OK(chunker.Process(block, &whole));
+void AssertWholeObjects(Chunker& chunker, const std::shared_ptr<Buffer>& block,
+ int expected_count) {
+ std::shared_ptr<Buffer> whole, partial;
+ ASSERT_OK(chunker.Process(block, &whole, &partial));
int count = 0;
- while (!WhitespaceOnly(whole)) {
+ while (whole && !WhitespaceOnly(whole)) {
if (ConsumeWholeObject(&whole) == string_view::npos) FAIL();
++count;
}
ASSERT_EQ(count, expected_count);
}
-void AssertChunking(Chunker& chunker, std::string str, int total_count) {
+void AssertChunking(Chunker& chunker, std::shared_ptr<Buffer> buf, int total_count) {
// First chunkize whole JSON block
- AssertWholeObjects(chunker, str, total_count);
+ AssertWholeObjects(chunker, buf, total_count);
// Then chunkize incomplete substrings of the block
- for (int i = 0; i != total_count; ++i) {
+ for (int i = 0; i < total_count; ++i) {
// ensure shearing the closing brace off the last object causes it to be chunked out
- string_view str_view(str);
- auto last_brace = str_view.find_last_of('}');
- AssertWholeObjects(chunker, str.substr(0, last_brace), total_count - i - 1);
+ auto last_brace = string_view(*buf).find_last_of('}');
+ AssertWholeObjects(chunker, SliceBuffer(buf, 0, last_brace), total_count - i - 1);
// ensure skipping one object reduces the count by one
- ASSERT_NE(ConsumeWholeObject(&str_view), string_view::npos);
- str = str_view.to_string();
- AssertWholeObjects(chunker, str, total_count - i - 1);
+ ASSERT_NE(ConsumeWholeObject(&buf), string_view::npos);
+ AssertWholeObjects(chunker, buf, total_count - i - 1);
}
}
-void AssertStraddledChunking(Chunker& chunker, string_view str) {
- auto first_half = str.substr(0, str.size() / 2).to_string();
- auto second_half = str.substr(str.size() / 2);
+void AssertStraddledChunking(Chunker& chunker, const std::shared_ptr<Buffer>& buf) {
+ auto first_half = SliceBuffer(buf, 0, buf->size() / 2);
+ auto second_half = SliceBuffer(buf, buf->size() / 2);
AssertChunking(chunker, first_half, 1);
- string_view first_whole;
- ASSERT_OK(chunker.Process(first_half, &first_whole));
- ASSERT_TRUE(string_view(first_half).starts_with(first_whole));
- auto partial = string_view(first_half).substr(first_whole.size());
- string_view completion;
- ASSERT_OK(chunker.Process(partial, second_half, &completion));
- ASSERT_TRUE(second_half.starts_with(completion));
- auto straddling = partial.to_string() + completion.to_string();
- string_view straddling_view(straddling);
- auto length = ConsumeWholeObject(&straddling_view);
+ std::shared_ptr<Buffer> first_whole, partial;
+ ASSERT_OK(chunker.Process(first_half, &first_whole, &partial));
+ ASSERT_TRUE(string_view(*first_half).starts_with(string_view(*first_whole)));
+ std::shared_ptr<Buffer> completion, rest;
+ ASSERT_OK(chunker.ProcessWithPartial(partial, second_half, &completion, &rest));
+ ASSERT_TRUE(string_view(*second_half).starts_with(string_view(*completion)));
+ std::shared_ptr<Buffer> straddling;
+ ASSERT_OK(
+ ConcatenateBuffers({partial, completion}, default_memory_pool(), &straddling));
+ auto length = ConsumeWholeObject(&straddling);
ASSERT_NE(length, string_view::npos);
ASSERT_NE(length, 0);
- auto final_whole = second_half.substr(completion.size());
+ auto final_whole = SliceBuffer(second_half, completion->size());
+ ASSERT_EQ(string_view(*final_whole), string_view(*rest));
length = ConsumeWholeObject(&final_whole);
ASSERT_NE(length, string_view::npos);
ASSERT_NE(length, 0);
@@ -142,12 +137,12 @@ class BaseChunkerTest : public ::testing::TestWithParam<bool> {
std::unique_ptr<Chunker> chunker_;
};
-INSTANTIATE_TEST_CASE_P(ChunkerTest, BaseChunkerTest, ::testing::Values(true));
-
INSTANTIATE_TEST_CASE_P(NoNewlineChunkerTest, BaseChunkerTest, ::testing::Values(false));
+INSTANTIATE_TEST_CASE_P(ChunkerTest, BaseChunkerTest, ::testing::Values(true));
+
constexpr auto object_count = 3;
-const std::vector<std::string>& lines() {
+static const std::vector<std::string>& lines() {
static const std::vector<std::string> l = {R"({"0":"ab","1":"c","2":""})",
R"({"0":"def","1":"","2":"gh"})",
R"({"0":"","1":"ij","2":"kl"})"};
@@ -159,8 +154,10 @@ TEST_P(BaseChunkerTest, Basics) {
}
TEST_P(BaseChunkerTest, Empty) {
- AssertChunking(*chunker_, "\n", 0);
- AssertChunking(*chunker_, "\n\n", 0);
+ auto empty = std::make_shared<Buffer>("\n");
+ AssertChunking(*chunker_, empty, 0);
+ empty = std::make_shared<Buffer>("\n\n");
+ AssertChunking(*chunker_, empty, 0);
}
TEST(ChunkerTest, PrettyPrinted) {
@@ -193,15 +190,18 @@ TEST(ChunkerTest, StraddlingSingleLine) {
}
TEST_P(BaseChunkerTest, StraddlingEmpty) {
- auto joined = join(lines(), "\n");
- auto first = string_view(joined).substr(0, lines()[0].size() + 1);
- auto rest = string_view(joined).substr(first.size());
- string_view first_whole;
- ASSERT_OK(chunker_->Process(first, &first_whole));
- auto partial = first.substr(first_whole.size());
- string_view completion;
- ASSERT_OK(chunker_->Process(partial, rest, &completion));
- ASSERT_EQ(completion.size(), 0);
+ auto all = join(lines(), "\n");
+
+ auto first = SliceBuffer(all, 0, lines()[0].size() + 1);
+ std::shared_ptr<Buffer> first_whole, partial;
+ ASSERT_OK(chunker_->Process(first, &first_whole, &partial));
+ ASSERT_TRUE(WhitespaceOnly(partial));
+
+ auto others = SliceBuffer(all, first->size());
+ std::shared_ptr<Buffer> completion, rest;
+ ASSERT_OK(chunker_->ProcessWithPartial(partial, others, &completion, &rest));
+ ASSERT_EQ(completion->size(), 0);
+ ASSERT_TRUE(rest->Equals(*others));
}
} // namespace json
diff --git a/cpp/src/arrow/json/chunker.cc b/cpp/src/arrow/json/chunker.cc
index 7b992df..c154a2a 100644
--- a/cpp/src/arrow/json/chunker.cc
+++ b/cpp/src/arrow/json/chunker.cc
@@ -18,81 +18,82 @@
#include "arrow/json/chunker.h"
#include <algorithm>
-#include <cstdint>
-#include <memory>
-#include <string>
#include <utility>
#include <vector>
-#if defined(ARROW_HAVE_SSE4_2)
-#define RAPIDJSON_SSE42 1
-#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
-#endif
-#if defined(ARROW_HAVE_SSE2)
-#define RAPIDJSON_SSE2 1
-#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
-#endif
-#include <rapidjson/error/en.h>
-#include <rapidjson/reader.h>
+#include "arrow/json/rapidjson-defs.h"
+#include "rapidjson/reader.h"
-#include "arrow/status.h"
-#include "arrow/util/logging.h"
+#include "arrow/buffer.h"
+#include "arrow/json/options.h"
#include "arrow/util/stl.h"
#include "arrow/util/string_view.h"
namespace arrow {
namespace json {
+namespace rj = arrow::rapidjson;
+
using internal::make_unique;
using util::string_view;
-Status StraddlingTooLarge() {
+static Status StraddlingTooLarge() {
return Status::Invalid("straddling object straddles two block boundaries");
}
-std::size_t ConsumeWhitespace(string_view* str) {
+static size_t ConsumeWhitespace(std::shared_ptr<Buffer>* buf) {
#if defined(ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD)
- auto nonws_begin =
- rapidjson::SkipWhitespace_SIMD(str->data(), str->data() + str->size());
- auto ws_count = nonws_begin - str->data();
- *str = str->substr(ws_count);
- return static_cast<std::size_t>(ws_count);
-#undef ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD
+ auto data = reinterpret_cast<const char*>((*buf)->data());
+ auto nonws_begin = rj::SkipWhitespace_SIMD(data, data + (*buf)->size());
+ auto ws_count = nonws_begin - data;
+ *buf = SliceBuffer(*buf, ws_count);
+ return static_cast<size_t>(ws_count);
#else
- auto ws_count = str->find_first_not_of(" \t\r\n");
- *str = str->substr(ws_count);
+ auto ws_count = string_view(**buf).find_first_not_of(" \t\r\n");
+ if (ws_count == string_view::npos) {
+ ws_count = (*buf)->size();
+ }
+ *buf = SliceBuffer(*buf, ws_count);
return ws_count;
#endif
}
class NewlinesStrictlyDelimitChunker : public Chunker {
public:
- Status Process(string_view block, string_view* chunked) override {
- auto last_newline = block.find_last_of("\n\r");
+ Status Process(const std::shared_ptr<Buffer>& block, std::shared_ptr<Buffer>* whole,
+ std::shared_ptr<Buffer>* partial) override {
+ auto last_newline = string_view(*block).find_last_of("\n\r");
if (last_newline == string_view::npos) {
// no newlines in this block, return empty chunk
- *chunked = string_view();
+ *whole = SliceBuffer(block, 0, 0);
+ *partial = block;
} else {
- *chunked = block.substr(0, last_newline + 1);
+ *whole = SliceBuffer(block, 0, last_newline + 1);
+ *partial = SliceBuffer(block, last_newline + 1);
}
return Status::OK();
}
- Status Process(string_view partial, string_view block,
- string_view* completion) override {
+ Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial_original,
+ const std::shared_ptr<Buffer>& block,
+ std::shared_ptr<Buffer>* completion,
+ std::shared_ptr<Buffer>* rest) override {
+ auto partial = partial_original;
ConsumeWhitespace(&partial);
- if (partial.size() == 0) {
+ if (partial->size() == 0) {
// if partial is empty, don't bother looking for completion
- *completion = string_view();
+ *completion = SliceBuffer(block, 0, 0);
+ *rest = block;
return Status::OK();
}
- auto first_newline = block.find_first_of("\n\r");
+ auto first_newline = string_view(*block).find_first_of("\n\r");
if (first_newline == string_view::npos) {
// no newlines in this block; straddling object straddles *two* block boundaries.
// retry with larger buffer
return StraddlingTooLarge();
}
- *completion = block.substr(0, first_newline + 1);
+ *completion = SliceBuffer(block, 0, first_newline + 1);
+ *rest = SliceBuffer(block, first_newline + 1);
return Status::OK();
}
};
@@ -104,7 +105,12 @@ class MultiStringStream {
using Ch = char;
explicit MultiStringStream(std::vector<string_view> strings)
: strings_(std::move(strings)) {
- std::remove(strings_.begin(), strings_.end(), string_view(""));
+ std::reverse(strings_.begin(), strings_.end());
+ }
+ explicit MultiStringStream(const BufferVector& buffers) : strings_(buffers.size()) {
+ for (size_t i = 0; i < buffers.size(); ++i) {
+ strings_[i] = string_view(*buffers[i]);
+ }
std::reverse(strings_.begin(), strings_.end());
}
char Peek() const {
@@ -122,35 +128,35 @@ class MultiStringStream {
++index_;
return taken;
}
- std::size_t Tell() { return index_; }
+ size_t Tell() { return index_; }
void Put(char) { ARROW_LOG(FATAL) << "not implemented"; }
void Flush() { ARROW_LOG(FATAL) << "not implemented"; }
char* PutBegin() {
ARROW_LOG(FATAL) << "not implemented";
return nullptr;
}
- std::size_t PutEnd(char*) {
+ size_t PutEnd(char*) {
ARROW_LOG(FATAL) << "not implemented";
return 0;
}
private:
- std::size_t index_ = 0;
+ size_t index_ = 0;
std::vector<string_view> strings_;
};
template <typename Stream>
-std::size_t ConsumeWholeObject(Stream&& stream) {
- static constexpr unsigned parse_flags = rapidjson::kParseIterativeFlag |
- rapidjson::kParseStopWhenDoneFlag |
- rapidjson::kParseNumbersAsStringsFlag;
- rapidjson::BaseReaderHandler<rapidjson::UTF8<>> handler;
- rapidjson::Reader reader;
+static size_t ConsumeWholeObject(Stream&& stream) {
+ static constexpr unsigned parse_flags = rj::kParseIterativeFlag |
+ rj::kParseStopWhenDoneFlag |
+ rj::kParseNumbersAsStringsFlag;
+ rj::BaseReaderHandler<rj::UTF8<>> handler;
+ rj::Reader reader;
// parse a single JSON object
switch (reader.Parse<parse_flags>(stream, handler).Code()) {
- case rapidjson::kParseErrorNone:
+ case rj::kParseErrorNone:
return stream.Tell();
- case rapidjson::kParseErrorDocumentEmpty:
+ case rj::kParseErrorDocumentEmpty:
return 0;
default:
// rapidjson emitted an error, the most recent object was partial
@@ -160,37 +166,44 @@ std::size_t ConsumeWholeObject(Stream&& stream) {
class ParsingChunker : public Chunker {
public:
- Status Process(string_view block, string_view* chunked) override {
- if (block.size() == 0) {
- *chunked = string_view();
+ Status Process(const std::shared_ptr<Buffer>& block, std::shared_ptr<Buffer>* whole,
+ std::shared_ptr<Buffer>* partial) override {
+ if (block->size() == 0) {
+ *whole = SliceBuffer(block, 0, 0);
+ *partial = block;
return Status::OK();
}
- std::size_t total_length = 0;
- for (auto consumed = block;; consumed = block.substr(total_length)) {
- using rapidjson::MemoryStream;
- MemoryStream ms(consumed.data(), consumed.size());
- using InputStream = rapidjson::EncodedInputStream<rapidjson::UTF8<>, MemoryStream>;
+ size_t total_length = 0;
+ for (auto consumed = block;; consumed = SliceBuffer(block, total_length)) {
+ rj::MemoryStream ms(reinterpret_cast<const char*>(consumed->data()),
+ consumed->size());
+ using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
auto length = ConsumeWholeObject(InputStream(ms));
if (length == string_view::npos || length == 0) {
// found incomplete object or consumed is empty
break;
}
- if (length > consumed.size()) {
- total_length += consumed.size();
+ if (static_cast<int64_t>(length) > consumed->size()) {
+ total_length += consumed->size();
break;
}
total_length += length;
}
- *chunked = block.substr(0, total_length);
+ *whole = SliceBuffer(block, 0, total_length);
+ *partial = SliceBuffer(block, total_length);
return Status::OK();
}
- Status Process(string_view partial, string_view block,
- string_view* completion) override {
+ Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial_original,
+ const std::shared_ptr<Buffer>& block,
+ std::shared_ptr<Buffer>* completion,
+ std::shared_ptr<Buffer>* rest) override {
+ auto partial = partial_original;
ConsumeWhitespace(&partial);
- if (partial.size() == 0) {
+ if (partial->size() == 0) {
// if partial is empty, don't bother looking for completion
- *completion = string_view();
+ *completion = SliceBuffer(block, 0, 0);
+ *rest = block;
return Status::OK();
}
auto length = ConsumeWholeObject(MultiStringStream({partial, block}));
@@ -199,12 +212,14 @@ class ParsingChunker : public Chunker {
// retry with larger buffer
return StraddlingTooLarge();
}
- *completion = block.substr(0, length - partial.size());
+ auto completion_length = length - partial->size();
+ *completion = SliceBuffer(block, 0, completion_length);
+ *rest = SliceBuffer(block, completion_length);
return Status::OK();
}
};
-std::unique_ptr<Chunker> Chunker::Make(ParseOptions options) {
+std::unique_ptr<Chunker> Chunker::Make(const ParseOptions& options) {
if (!options.newlines_in_values) {
return make_unique<NewlinesStrictlyDelimitChunker>();
}
diff --git a/cpp/src/arrow/json/chunker.h b/cpp/src/arrow/json/chunker.h
index a74abf2..0f94d81 100644
--- a/cpp/src/arrow/json/chunker.h
+++ b/cpp/src/arrow/json/chunker.h
@@ -15,21 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef ARROW_JSON_CHUNKER_H
-#define ARROW_JSON_CHUNKER_H
+#pragma once
#include <memory>
-#include "arrow/json/options.h"
#include "arrow/status.h"
#include "arrow/util/macros.h"
-#include "arrow/util/sse-util.h"
-#include "arrow/util/string_view.h"
#include "arrow/util/visibility.h"
namespace arrow {
+
+class Buffer;
+
namespace json {
+struct ParseOptions;
+
/// \class Chunker
/// \brief A reusable block-based chunker for JSON data
///
@@ -41,17 +42,23 @@ class ARROW_EXPORT Chunker {
/// \brief Carve up a chunk in a block of data to contain only whole objects
/// \param[in] block json data to be chunked
- /// \param[out] chunked subrange of block containing whole json objects
- virtual Status Process(util::string_view block, util::string_view* chunked) = 0;
+ /// \param[out] whole subrange of block containing whole json objects
+ /// \param[out] partial subrange of block a partial json object
+ virtual Status Process(const std::shared_ptr<Buffer>& block,
+ std::shared_ptr<Buffer>* whole,
+ std::shared_ptr<Buffer>* partial) = 0;
/// \brief Carve the completion of a partial object out of a block
/// \param[in] partial incomplete json object
/// \param[in] block json data
- /// \param[out] completion subrange of block contining the completion of partial
- virtual Status Process(util::string_view partial, util::string_view block,
- util::string_view* completion) = 0;
+ /// \param[out] completion subrange of block containing the completion of partial
+ /// \param[out] rest subrange of block containing what completion does not cover
+ virtual Status ProcessWithPartial(const std::shared_ptr<Buffer>& partial,
+ const std::shared_ptr<Buffer>& block,
+ std::shared_ptr<Buffer>* completion,
+ std::shared_ptr<Buffer>* rest) = 0;
- static std::unique_ptr<Chunker> Make(ParseOptions options);
+ static std::unique_ptr<Chunker> Make(const ParseOptions& options);
protected:
Chunker() = default;
@@ -60,5 +67,3 @@ class ARROW_EXPORT Chunker {
} // namespace json
} // namespace arrow
-
-#endif // ARROW_JSON_CHUNKER_H
diff --git a/cpp/src/arrow/json/options.h b/cpp/src/arrow/json/options.h
index 0ec1b29..8d27faa 100644
--- a/cpp/src/arrow/json/options.h
+++ b/cpp/src/arrow/json/options.h
@@ -15,20 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef ARROW_JSON_OPTIONS_H
-#define ARROW_JSON_OPTIONS_H
+#pragma once
#include <cstdint>
#include <memory>
-#include <string>
-#include <unordered_map>
-#include "arrow/type.h"
#include "arrow/util/visibility.h"
namespace arrow {
class DataType;
+class Schema;
namespace json {
@@ -64,5 +61,3 @@ struct ARROW_EXPORT ReadOptions {
} // namespace json
} // namespace arrow
-
-#endif // ARROW_JSON_OPTIONS_H
diff --git a/cpp/src/arrow/json/parser-benchmark.cc b/cpp/src/arrow/json/parser-benchmark.cc
index 82f0b33..bdf83af 100644
--- a/cpp/src/arrow/json/parser-benchmark.cc
+++ b/cpp/src/arrow/json/parser-benchmark.cc
@@ -17,9 +17,9 @@
#include "benchmark/benchmark.h"
-#include <iostream>
#include <string>
+#include "arrow/json/chunker.h"
#include "arrow/json/options.h"
#include "arrow/json/parser.h"
#include "arrow/json/test-common.h"
@@ -28,22 +28,72 @@
namespace arrow {
namespace json {
+static void BenchmarkJSONChunking(benchmark::State& state, // NOLINT non-const reference
+ const std::shared_ptr<Buffer>& json,
+ ParseOptions options) {
+ auto chunker = Chunker::Make(options);
+ for (auto _ : state) {
+ std::shared_ptr<Buffer> chunked, partial;
+ ABORT_NOT_OK(chunker->Process(json, &chunked, &partial));
+ }
+ state.SetBytesProcessed(state.iterations() * json->size());
+}
+
+static void BM_ChunkJSONPrettyPrinted(
+ benchmark::State& state) { // NOLINT non-const reference
+ const int32_t num_rows = 5000;
+ auto options = ParseOptions::Defaults();
+ options.newlines_in_values = true;
+ options.explicit_schema = schema({field("int", int32()), field("str", utf8())});
+ std::default_random_engine engine;
+ std::string json;
+ for (int i = 0; i < num_rows; ++i) {
+ StringBuffer sb;
+ Writer writer(sb);
+ ABORT_NOT_OK(Generate(options.explicit_schema, engine, &writer));
+ json += PrettyPrint(sb.GetString());
+ json += "\n";
+ }
+ BenchmarkJSONChunking(state, std::make_shared<Buffer>(json), options);
+}
+
+BENCHMARK(BM_ChunkJSONPrettyPrinted)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
+static void BM_ChunkJSONLineDelimited(
+ benchmark::State& state) { // NOLINT non-const reference
+ const int32_t num_rows = 5000;
+ auto options = ParseOptions::Defaults();
+ options.newlines_in_values = false;
+ options.explicit_schema = schema({field("int", int32()), field("str", utf8())});
+ std::default_random_engine engine;
+ std::string json;
+ for (int i = 0; i < num_rows; ++i) {
+ StringBuffer sb;
+ Writer writer(sb);
+ ABORT_NOT_OK(Generate(options.explicit_schema, engine, &writer));
+ json += sb.GetString();
+ json += "\n";
+ }
+ BenchmarkJSONChunking(state, std::make_shared<Buffer>(json), options);
+}
+
+BENCHMARK(BM_ChunkJSONLineDelimited)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
+
static void BenchmarkJSONParsing(benchmark::State& state, // NOLINT non-const reference
- const std::string& json, int32_t num_rows,
+ const std::shared_ptr<Buffer>& json, int32_t num_rows,
ParseOptions options) {
for (auto _ : state) {
- std::shared_ptr<Buffer> src;
- ABORT_NOT_OK(MakeBuffer(json, &src));
- BlockParser parser(options, src);
- ABORT_NOT_OK(parser.Parse(src));
- if (parser.num_rows() != num_rows) {
+ std::unique_ptr<BlockParser> parser;
+ ABORT_NOT_OK(BlockParser::Make(options, &parser));
+ ABORT_NOT_OK(parser->Parse(json));
+ if (parser->num_rows() != num_rows) {
std::cerr << "Parsing incomplete\n";
std::abort();
}
std::shared_ptr<Array> parsed;
- ABORT_NOT_OK(parser.Finish(&parsed));
+ ABORT_NOT_OK(parser->Finish(&parsed));
}
- state.SetBytesProcessed(state.iterations() * json.size());
+ state.SetBytesProcessed(state.iterations() * json->size());
}
static void BM_ParseJSONBlockWithSchema(
@@ -52,16 +102,16 @@ static void BM_ParseJSONBlockWithSchema(
auto options = ParseOptions::Defaults();
options.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
options.explicit_schema = schema({field("int", int32()), field("str", utf8())});
- std::mt19937_64 engine;
+ std::default_random_engine engine;
std::string json;
- for (int i = 0; i != num_rows; ++i) {
+ for (int i = 0; i < num_rows; ++i) {
StringBuffer sb;
Writer writer(sb);
ABORT_NOT_OK(Generate(options.explicit_schema, engine, &writer));
json += sb.GetString();
json += "\n";
}
- BenchmarkJSONParsing(state, json, num_rows, options);
+ BenchmarkJSONParsing(state, std::make_shared<Buffer>(json), num_rows, options);
}
BENCHMARK(BM_ParseJSONBlockWithSchema)->MinTime(1.0)->Unit(benchmark::kMicrosecond);
diff --git a/cpp/src/arrow/json/parser-test.cc b/cpp/src/arrow/json/parser-test.cc
index c3af4a7..5d14024 100644
--- a/cpp/src/arrow/json/parser-test.cc
+++ b/cpp/src/arrow/json/parser-test.cc
@@ -15,33 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-#include <cstdint>
-#include <iomanip>
#include <string>
#include <utility>
#include <vector>
#include <gtest/gtest.h>
-#include <rapidjson/error/en.h>
-#include <rapidjson/reader.h>
-#include "arrow/ipc/json-simple.h"
#include "arrow/json/options.h"
#include "arrow/json/parser.h"
-#include "arrow/json/reader.h"
#include "arrow/json/test-common.h"
#include "arrow/status.h"
-#include "arrow/testing/util.h"
-#include "arrow/util/logging.h"
+#include "arrow/testing/gtest_util.h"
#include "arrow/util/string_view.h"
namespace arrow {
+namespace json {
using util::string_view;
-namespace json {
-
-std::string scalars_only_src() {
+static std::string scalars_only_src() {
return R"(
{ "hello": 3.5, "world": false, "yo": "thing" }
{ "hello": 3.2, "world": null }
@@ -50,7 +42,7 @@ std::string scalars_only_src() {
)";
}
-std::string nested_src() {
+static std::string nested_src() {
return R"(
{ "hello": 3.5, "world": false, "yo": "thing", "arr": [1, 2, 3], "nuf": {} }
{ "hello": 3.2, "world": null, "arr": [2], "nuf": null }
@@ -59,9 +51,10 @@ std::string nested_src() {
)";
}
-void AssertRawStructArraysEqual(const StructArray& expected, const StructArray& actual);
+void AssertUnconvertedStructArraysEqual(const StructArray& expected,
+ const StructArray& actual);
-void AssertRawArraysEqual(const Array& expected, const Array& actual) {
+void AssertUnconvertedArraysEqual(const Array& expected, const Array& actual) {
switch (actual.type_id()) {
case Type::BOOL:
case Type::NA:
@@ -81,43 +74,38 @@ void AssertRawArraysEqual(const Array& expected, const Array& actual) {
AssertBufferEqual(*expected_offsets, *actual_offsets);
auto expected_values = static_cast<const ListArray&>(expected).values();
auto actual_values = static_cast<const ListArray&>(actual).values();
- return AssertRawArraysEqual(*expected_values, *actual_values);
+ return AssertUnconvertedArraysEqual(*expected_values, *actual_values);
}
case Type::STRUCT:
ASSERT_EQ(expected.type_id(), Type::STRUCT);
- return AssertRawStructArraysEqual(static_cast<const StructArray&>(expected),
- static_cast<const StructArray&>(actual));
+ return AssertUnconvertedStructArraysEqual(static_cast<const StructArray&>(expected),
+ static_cast<const StructArray&>(actual));
default:
FAIL();
}
}
-void AssertRawStructArraysEqual(const StructArray& expected, const StructArray& actual) {
+void AssertUnconvertedStructArraysEqual(const StructArray& expected,
+ const StructArray& actual) {
ASSERT_EQ(expected.num_fields(), actual.num_fields());
- for (int i = 0; i != expected.num_fields(); ++i) {
+ for (int i = 0; i < expected.num_fields(); ++i) {
auto expected_name = expected.type()->child(i)->name();
auto actual_name = actual.type()->child(i)->name();
ASSERT_EQ(expected_name, actual_name);
- AssertRawArraysEqual(*expected.field(i), *actual.field(i));
+ AssertUnconvertedArraysEqual(*expected.field(i), *actual.field(i));
}
}
void AssertParseColumns(ParseOptions options, string_view src_str,
const std::vector<std::shared_ptr<Field>>& fields,
const std::vector<std::string>& columns_json) {
- std::shared_ptr<Buffer> src;
- ASSERT_OK(MakeBuffer(src_str, &src));
- BlockParser parser(options, src);
- ASSERT_OK(parser.Parse(src));
std::shared_ptr<Array> parsed;
- ASSERT_OK(parser.Finish(&parsed));
+ ASSERT_OK(ParseFromString(options, src_str, &parsed));
auto struct_array = std::static_pointer_cast<StructArray>(parsed);
- for (size_t i = 0; i != fields.size(); ++i) {
- // std::shared_ptr<Array> column_expected;
- // ASSERT_OK(ArrayFromJSON(fields[i]->type(), columns_json[i], &column_expected));
+ for (size_t i = 0; i < fields.size(); ++i) {
auto column_expected = ArrayFromJSON(fields[i]->type(), columns_json[i]);
auto column = struct_array->GetFieldByName(fields[i]->name());
- AssertRawArraysEqual(*column_expected, *column);
+ AssertUnconvertedArraysEqual(*column_expected, *column);
}
}
@@ -160,10 +148,8 @@ TEST(BlockParserWithSchema, FailOnInconvertible) {
auto options = ParseOptions::Defaults();
options.explicit_schema = schema({field("a", int32())});
options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
- std::shared_ptr<Buffer> src;
- ASSERT_OK(MakeBuffer("{\"a\":0}\n{\"a\":true}", &src));
- BlockParser parser(options, src);
- ASSERT_RAISES(Invalid, parser.Parse(src));
+ std::shared_ptr<Array> parsed;
+ ASSERT_RAISES(Invalid, ParseFromString(options, "{\"a\":0}\n{\"a\":true}", &parsed));
}
TEST(BlockParserWithSchema, Nested) {
@@ -183,10 +169,8 @@ TEST(BlockParserWithSchema, FailOnIncompleteJson) {
auto options = ParseOptions::Defaults();
options.explicit_schema = schema({field("a", int32())});
options.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
- std::shared_ptr<Buffer> src;
- ASSERT_OK(MakeBuffer("{\"a\":0, \"b\"", &src));
- BlockParser parser(options, src);
- ASSERT_RAISES(Invalid, parser.Parse(src));
+ std::shared_ptr<Array> parsed;
+ ASSERT_RAISES(Invalid, ParseFromString(options, "{\"a\":0, \"b\"", &parsed));
}
TEST(BlockParser, Basics) {
@@ -210,40 +194,5 @@ TEST(BlockParser, Nested) {
R"([{"ps":null}, null, {"ps":"78"}, {"ps":"90"}])"});
}
-void AssertParseOne(ParseOptions options, string_view src_str,
- const std::vector<std::shared_ptr<Field>>& fields,
- const std::vector<std::string>& columns_json) {
- std::shared_ptr<Buffer> src;
- ASSERT_OK(MakeBuffer(src_str, &src));
- std::shared_ptr<RecordBatch> parsed;
- ASSERT_OK(ParseOne(options, src, &parsed));
- for (size_t i = 0; i != fields.size(); ++i) {
- auto column_expected = ArrayFromJSON(fields[i]->type(), columns_json[i]);
- auto column = parsed->GetColumnByName(fields[i]->name());
- AssertArraysEqual(*column_expected, *column);
- }
-}
-
-TEST(ParseOne, Basics) {
- auto options = ParseOptions::Defaults();
- options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
- AssertParseOne(
- options, scalars_only_src(),
- {field("hello", float64()), field("world", boolean()), field("yo", utf8())},
- {"[3.5, 3.2, 3.4, 0.0]", "[false, null, null, true]",
- "[\"thing\", null, \"\xe5\xbf\x8d\", null]"});
-}
-
-TEST(ParseOne, Nested) {
- auto options = ParseOptions::Defaults();
- options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
- AssertParseOne(
- options, nested_src(),
- {field("yo", utf8()), field("arr", list(int64())),
- field("nuf", struct_({field("ps", int64())}))},
- {"[\"thing\", null, \"\xe5\xbf\x8d\", null]", R"([[1, 2, 3], [2], [], null])",
- R"([{"ps":null}, null, {"ps":78}, {"ps":90}])"});
-}
-
} // namespace json
} // namespace arrow
diff --git a/cpp/src/arrow/json/parser.cc b/cpp/src/arrow/json/parser.cc
index 99b8911..3fbd1d7 100644
--- a/cpp/src/arrow/json/parser.cc
+++ b/cpp/src/arrow/json/parser.cc
@@ -17,95 +17,109 @@
#include "arrow/json/parser.h"
-#include <algorithm>
-#include <cstdio>
#include <limits>
-#include <sstream>
#include <tuple>
+#include <unordered_map>
#include <utility>
#include <vector>
-#include <rapidjson/error/en.h>
-#include <rapidjson/reader.h>
+#include "arrow/json/rapidjson-defs.h"
+#include "rapidjson/error/en.h"
+#include "rapidjson/reader.h"
#include "arrow/array.h"
#include "arrow/buffer-builder.h"
#include "arrow/builder.h"
-#include "arrow/csv/converter.h"
#include "arrow/memory_pool.h"
-#include "arrow/record_batch.h"
-#include "arrow/status.h"
#include "arrow/type.h"
-#include "arrow/util/decimal.h"
#include "arrow/util/logging.h"
#include "arrow/util/stl.h"
+#include "arrow/util/string_view.h"
+#include "arrow/util/trie.h"
#include "arrow/visitor_inline.h"
namespace arrow {
namespace json {
+namespace rj = arrow::rapidjson;
+
using internal::BitsetStack;
using internal::checked_cast;
+using internal::make_unique;
using util::string_view;
template <typename... T>
-Status ParseError(T&&... t) {
+static Status ParseError(T&&... t) {
return Status::Invalid("JSON parse error: ", std::forward<T>(t)...);
}
-Status KindChangeError(Kind::type from, Kind::type to) {
- auto from_name = Tag(from)->value(0);
- auto to_name = Tag(to)->value(0);
- return ParseError("A column changed from ", from_name, " to ", to_name);
+static Status KindChangeError(Kind::type from, Kind::type to) {
+ return ParseError("A column changed from ", Kind::Name(from), " to ", Kind::Name(to));
}
-/// Similar to StringBuilder, but appends bytes into the provided buffer without
-/// resizing. This builder does not support appending nulls.
-class UnsafeStringBuilder {
- public:
- UnsafeStringBuilder(MemoryPool* pool, const std::shared_ptr<Buffer>& buffer)
- : offsets_builder_(pool), values_buffer_(buffer) {
- DCHECK_NE(values_buffer_, nullptr);
- }
-
- Status Append(string_view str) {
- DCHECK_LE(static_cast<int64_t>(str.size()), capacity() - values_end_);
- RETURN_NOT_OK(AppendNextOffset());
- std::memcpy(values_buffer_->mutable_data() + values_end_, str.data(), str.size());
- length_ += 1;
- values_end_ += str.size();
- return Status::OK();
- }
-
- // Builder may not be reused after Finish()
- Status Finish(std::shared_ptr<Array>* out, int64_t* values_length = nullptr) && {
- RETURN_NOT_OK(AppendNextOffset());
- if (values_length) {
- *values_length = values_end_;
- }
- std::shared_ptr<Buffer> offsets;
- RETURN_NOT_OK(offsets_builder_.Finish(&offsets));
- auto data = ArrayData::Make(utf8(), length_, {nullptr, offsets, values_buffer_}, 0);
- *out = MakeArray(data);
- return Status::OK();
- }
+const std::string& Kind::Name(Kind::type kind) {
+ static const std::string names[] = {"null", "boolean", "number",
+ "string", "array", "object"};
- int64_t length() { return length_; }
-
- int64_t capacity() { return values_buffer_->size(); }
+ return names[kind];
+}
- int64_t remaining_capacity() { return values_buffer_->size() - values_end_; }
+const std::shared_ptr<const KeyValueMetadata>& Kind::Tag(Kind::type kind) {
+ static const std::shared_ptr<const KeyValueMetadata> tags[] = {
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kNull)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kBoolean)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kNumber)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kString)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kArray)}}),
+ key_value_metadata({{"json_kind", Kind::Name(Kind::kObject)}}),
+ };
+ return tags[kind];
+}
- private:
- Status AppendNextOffset() {
- return offsets_builder_.Append(static_cast<int32_t>(values_end_));
+static internal::Trie MakeFromTagTrie() {
+ internal::TrieBuilder builder;
+ for (auto kind : {Kind::kNull, Kind::kBoolean, Kind::kNumber, Kind::kString,
+ Kind::kArray, Kind::kObject}) {
+ DCHECK_OK(builder.Append(Kind::Name(kind)));
}
+ auto name_to_kind = builder.Finish();
+ DCHECK_OK(name_to_kind.Validate());
+ return name_to_kind;
+}
- int64_t length_ = 0;
- int64_t values_end_ = 0;
- TypedBufferBuilder<int32_t> offsets_builder_;
- std::shared_ptr<Buffer> values_buffer_;
-};
+Kind::type Kind::FromTag(const std::shared_ptr<const KeyValueMetadata>& tag) {
+ static internal::Trie name_to_kind = MakeFromTagTrie();
+ DCHECK_NE(tag->FindKey("json_kind"), -1);
+ util::string_view name = tag->value(tag->FindKey("json_kind"));
+ DCHECK_NE(name_to_kind.Find(name), -1);
+ return static_cast<Kind::type>(name_to_kind.Find(name));
+}
+
+Status Kind::ForType(const DataType& type, Kind::type* kind) {
+ struct {
+ Status Visit(const NullType&) { return SetKind(Kind::kNull); }
+ Status Visit(const BooleanType&) { return SetKind(Kind::kBoolean); }
+ Status Visit(const Number&) { return SetKind(Kind::kNumber); }
+ Status Visit(const TimeType&) { return SetKind(Kind::kNumber); }
+ Status Visit(const DateType&) { return SetKind(Kind::kNumber); }
+ Status Visit(const BinaryType&) { return SetKind(Kind::kString); }
+ Status Visit(const FixedSizeBinaryType&) { return SetKind(Kind::kString); }
+ Status Visit(const DictionaryType& dict_type) {
+ return Kind::ForType(*dict_type.dictionary()->type(), kind_);
+ }
+ Status Visit(const ListType&) { return SetKind(Kind::kArray); }
+ Status Visit(const StructType&) { return SetKind(Kind::kObject); }
+ Status Visit(const DataType& not_impl) {
+ return Status::NotImplemented("JSON parsing of ", not_impl);
+ }
+ Status SetKind(Kind::type kind) {
+ *kind_ = kind;
+ return Status::OK();
+ }
+ Kind::type* kind_;
+ } visitor = {kind};
+ return VisitTypeInline(type, &visitor);
+}
/// \brief ArrayBuilder for parsed but unconverted arrays
template <Kind::type>
@@ -202,8 +216,9 @@ class ScalarBuilder {
explicit ScalarBuilder(MemoryPool* pool)
: data_builder_(pool), null_bitmap_builder_(pool) {}
- Status Append(int32_t index) {
+ Status Append(int32_t index, int32_t value_length) {
RETURN_NOT_OK(data_builder_.Append(index));
+ values_length_ += value_length;
return null_bitmap_builder_.Append(true);
}
@@ -229,9 +244,10 @@ class ScalarBuilder {
int64_t length() { return null_bitmap_builder_.length(); }
- // TODO(bkietz) track total length of bytes for later simpler allocation
+ int32_t values_length() { return values_length_; }
private:
+ int32_t values_length_;
TypedBufferBuilder<int32_t> data_builder_;
TypedBufferBuilder<bool> null_bitmap_builder_;
};
@@ -280,8 +296,8 @@ class RawArrayBuilder<Kind::kArray> {
RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap));
std::shared_ptr<Array> values;
RETURN_NOT_OK(handler.Finish(value_builder_, &values));
- auto type = list(
- field("item", values->type(), value_builder_.nullable, Tag(value_builder_.kind)));
+ auto type = list(field("item", values->type(), value_builder_.nullable,
+ Kind::Tag(value_builder_.kind)));
*out = MakeArray(ArrayData::Make(type, size, {null_bitmap, offsets}, {values->data()},
null_count));
return Status::OK();
@@ -346,12 +362,12 @@ class RawArrayBuilder<Kind::kObject> {
std::vector<std::shared_ptr<Field>> fields(num_fields());
std::vector<std::shared_ptr<ArrayData>> child_data(num_fields());
- for (int i = 0; i != num_fields(); ++i) {
+ for (int i = 0; i < num_fields(); ++i) {
std::shared_ptr<Array> values;
RETURN_NOT_OK(handler.Finish(field_builders_[i], &values));
child_data[i] = values->data();
fields[i] = field(field_names[i].to_string(), values->type(),
- field_builders_[i].nullable, Tag(field_builders_[i].kind));
+ field_builders_[i].nullable, Kind::Tag(field_builders_[i].kind));
}
*out = MakeArray(ArrayData::Make(struct_(std::move(fields)), size, {null_bitmap},
@@ -367,11 +383,11 @@ class RawArrayBuilder<Kind::kObject> {
TypedBufferBuilder<bool> null_bitmap_builder_;
};
-/// Three implementations are provided for BlockParser::Impl, one for each
+/// Three implementations are provided for BlockParser, one for each
/// UnexpectedFieldBehavior. However most of the logic is identical in each
/// case, so the majority of the implementation is in this base class
-class HandlerBase : public BlockParser::Impl,
- public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, HandlerBase> {
+class HandlerBase : public BlockParser,
+ public rj::BaseReaderHandler<rj::UTF8<>, HandlerBase> {
public:
/// Retrieve a pointer to a builder from a BuilderPtr
template <Kind::type kind>
@@ -384,9 +400,9 @@ class HandlerBase : public BlockParser::Impl,
/// Accessor for a stored error Status
Status Error() { return status_; }
- /// \defgroup rapidjson-handler-interface functions expected by rapidjson::Reader
+ /// \defgroup rapidjson-handler-interface functions expected by rj::Reader
///
- /// bool Key(const char* data, rapidjson::SizeType size, ...) is omitted since
+ /// bool Key(const char* data, rj::SizeType size, ...) is omitted since
/// the behavior varies greatly between UnexpectedFieldBehaviors
///
/// @{
@@ -400,12 +416,12 @@ class HandlerBase : public BlockParser::Impl,
return status_.ok();
}
- bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+ bool RawNumber(const char* data, rj::SizeType size, ...) {
status_ = AppendScalar<Kind::kNumber>(string_view(data, size));
return status_.ok();
}
- bool String(const char* data, rapidjson::SizeType size, ...) {
+ bool String(const char* data, rj::SizeType size, ...) {
status_ = AppendScalar<Kind::kString>(string_view(data, size));
return status_.ok();
}
@@ -425,22 +441,19 @@ class HandlerBase : public BlockParser::Impl,
return status_.ok();
}
- bool EndArray(rapidjson::SizeType size) {
+ bool EndArray(rj::SizeType size) {
status_ = EndArrayImpl(size);
return status_.ok();
}
/// @}
/// \brief Set up builders using an expected Schema
- Status SetSchema(const Schema& s) {
- DCHECK_EQ(arena<Kind::kObject>().size(), 1);
- for (const auto& f : s.fields()) {
- BuilderPtr field_builder;
- RETURN_NOT_OK(MakeBuilder(*f->type(), 0, &field_builder));
- field_builder.nullable = f->nullable();
- Cast<Kind::kObject>(builder_)->AddField(f->name(), field_builder);
+ Status Initialize(const std::shared_ptr<Schema>& s) {
+ auto type = struct_({});
+ if (s) {
+ type = struct_(s->fields());
}
- return Status::OK();
+ return MakeBuilder(*type, 0, &builder_);
}
Status Finish(BuilderPtr builder, std::shared_ptr<Array>* out) {
@@ -466,59 +479,67 @@ class HandlerBase : public BlockParser::Impl,
}
Status Finish(std::shared_ptr<Array>* parsed) override {
- RETURN_NOT_OK(std::move(scalar_values_builder_).Finish(&scalar_values_));
+ RETURN_NOT_OK(scalar_values_builder_.Finish(&scalar_values_));
return Finish(builder_, parsed);
}
- int32_t num_rows() override { return num_rows_; }
+ explicit HandlerBase(MemoryPool* pool)
+ : BlockParser(pool), scalar_values_builder_(pool) {}
protected:
- HandlerBase(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
- : pool_(pool),
- builder_(Kind::kObject, 0, false),
- scalar_values_builder_(pool, scalar_storage) {
- arena<Kind::kObject>().emplace_back(pool_);
- }
-
/// finish a column of scalar values (string or number)
Status FinishScalar(ScalarBuilder* builder, std::shared_ptr<Array>* out) {
std::shared_ptr<Array> indices;
+ // TODO(bkietz) embed builder->values_length() in this output somehow
RETURN_NOT_OK(builder->Finish(&indices));
return DictionaryArray::FromArrays(dictionary(int32(), scalar_values_), indices, out);
}
- template <typename Handler>
- Status DoParse(Handler& handler, const std::shared_ptr<Buffer>& json) {
- constexpr auto parse_flags =
- rapidjson::kParseInsituFlag | rapidjson::kParseIterativeFlag |
- rapidjson::kParseStopWhenDoneFlag | rapidjson::kParseNumbersAsStringsFlag;
- auto json_data = reinterpret_cast<char*>(json->mutable_data());
- rapidjson::GenericInsituStringStream<rapidjson::UTF8<>> ss(json_data);
- rapidjson::Reader reader;
-
- for (; num_rows_ != kMaxParserNumRows; ++num_rows_) {
- // parse a single line of JSON
- auto ok = reader.Parse<parse_flags>(ss, handler);
+ template <typename Handler, typename Stream>
+ Status DoParse(Handler& handler, Stream&& json) {
+ constexpr auto parse_flags = rj::kParseIterativeFlag | rj::kParseNanAndInfFlag |
+ rj::kParseStopWhenDoneFlag |
+ rj::kParseNumbersAsStringsFlag;
+
+ rj::Reader reader;
+
+ for (; num_rows_ < kMaxParserNumRows; ++num_rows_) {
+ auto ok = reader.Parse<parse_flags>(json, handler);
switch (ok.Code()) {
- case rapidjson::kParseErrorNone:
+ case rj::kParseErrorNone:
// parse the next object
continue;
- case rapidjson::kParseErrorDocumentEmpty: {
+ case rj::kParseErrorDocumentEmpty:
// parsed all objects, finish
return Status::OK();
- }
- case rapidjson::kParseErrorTermination:
+ case rj::kParseErrorTermination:
// handler emitted an error
return handler.Error();
default:
- // rapidjson emitted an error
- return ParseError(rapidjson::GetParseError_En(ok.Code()));
+ // rj emitted an error
+ // FIXME(bkietz) report more error data (at least the byte range of the current
+ // block, and maybe the path to the most recently parsed value?)
+ return ParseError(rj::GetParseError_En(ok.Code()));
}
}
return Status::Invalid("Exceeded maximum rows");
}
- /// construct a builder of staticallly defined kind in arenas_
+ template <typename Handler>
+ Status DoParse(Handler& handler, const std::shared_ptr<Buffer>& json) {
+ auto remaining_capacity = scalar_values_builder_.value_data_capacity() -
+ scalar_values_builder_.value_data_length();
+ if (json->size() > remaining_capacity) {
+ auto additional_storage = json->size() - remaining_capacity;
+ RETURN_NOT_OK(scalar_values_builder_.ReserveData(additional_storage));
+ }
+
+ rj::MemoryStream ms(reinterpret_cast<const char*>(json->data()), json->size());
+ using InputStream = rj::EncodedInputStream<rj::UTF8<>, rj::MemoryStream>;
+ return DoParse(handler, InputStream(ms));
+ }
+
+ /// construct a builder of statically defined kind in arenas_
template <Kind::type kind>
Status MakeBuilder(int64_t leading_nulls, BuilderPtr* builder) {
builder->index = static_cast<uint32_t>(arena<kind>().size());
@@ -531,7 +552,7 @@ class HandlerBase : public BlockParser::Impl,
/// construct a builder of whatever kind corresponds to a DataType
Status MakeBuilder(const DataType& t, int64_t leading_nulls, BuilderPtr* builder) {
Kind::type kind;
- RETURN_NOT_OK(KindForType(t, &kind));
+ RETURN_NOT_OK(Kind::ForType(t, &kind));
switch (kind) {
case Kind::kNull:
*builder = BuilderPtr(Kind::kNull, static_cast<uint32_t>(leading_nulls), true);
@@ -606,7 +627,7 @@ class HandlerBase : public BlockParser::Impl,
auto root = builder_;
auto struct_builder = Cast<Kind::kObject>(builder_);
RETURN_NOT_OK(struct_builder->AppendNull());
- for (int i = 0; i != struct_builder->num_fields(); ++i) {
+ for (int i = 0; i < struct_builder->num_fields(); ++i) {
builder_ = struct_builder->field_builder(i);
RETURN_NOT_OK(AppendNull());
}
@@ -632,8 +653,11 @@ class HandlerBase : public BlockParser::Impl,
return IllegallyChangedTo(kind);
}
auto index = static_cast<int32_t>(scalar_values_builder_.length());
- RETURN_NOT_OK(Cast<kind>(builder_)->Append(index));
- return scalar_values_builder_.Append(scalar);
+ auto value_length = static_cast<int32_t>(scalar.size());
+ RETURN_NOT_OK(Cast<kind>(builder_)->Append(index, value_length));
+ RETURN_NOT_OK(scalar_values_builder_.Reserve(1));
+ scalar_values_builder_.UnsafeAppend(scalar);
+ return Status::OK();
}
/// @}
@@ -668,7 +692,7 @@ class HandlerBase : public BlockParser::Impl,
auto parent = Cast<Kind::kObject>(builder_stack_.back());
auto expected_count = absent_fields_stack_.TopSize();
- for (field_index_ = 0; field_index_ != expected_count; ++field_index_) {
+ for (field_index_ = 0; field_index_ < expected_count; ++field_index_) {
if (!absent_fields_stack_[field_index_]) {
continue;
}
@@ -694,7 +718,7 @@ class HandlerBase : public BlockParser::Impl,
return Status::OK();
}
- Status EndArrayImpl(rapidjson::SizeType size) {
+ Status EndArrayImpl(rj::SizeType size) {
PopStacks();
// append to list_builder here
auto list_builder = Cast<Kind::kArray>(builder_);
@@ -730,7 +754,6 @@ class HandlerBase : public BlockParser::Impl,
}
Status status_;
- MemoryPool* pool_;
std::tuple<std::tuple<>, std::vector<RawArrayBuilder<Kind::kBoolean>>,
std::vector<RawArrayBuilder<Kind::kNumber>>,
std::vector<RawArrayBuilder<Kind::kString>>,
@@ -747,9 +770,8 @@ class HandlerBase : public BlockParser::Impl,
int field_index_;
// top of this stack == field_index_
std::vector<int> field_index_stack_;
- UnsafeStringBuilder scalar_values_builder_;
+ StringBuilder scalar_values_builder_;
std::shared_ptr<Array> scalar_values_;
- int32_t num_rows_ = 0;
};
template <UnexpectedFieldBehavior>
@@ -758,8 +780,7 @@ class Handler;
template <>
class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase {
public:
- Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
- : HandlerBase(pool, scalar_storage) {}
+ using HandlerBase::HandlerBase;
Status Parse(const std::shared_ptr<Buffer>& json) override {
return DoParse(*this, json);
@@ -768,7 +789,7 @@ class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase {
/// \ingroup rapidjson-handler-interface
///
/// if an unexpected field is encountered, emit a parse error and bail
- bool Key(const char* key, rapidjson::SizeType len, ...) {
+ bool Key(const char* key, rj::SizeType len, ...) {
if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
return true;
}
@@ -780,8 +801,7 @@ class Handler<UnexpectedFieldBehavior::Error> : public HandlerBase {
template <>
class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
public:
- Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
- : HandlerBase(pool, scalar_storage) {}
+ using HandlerBase::HandlerBase;
Status Parse(const std::shared_ptr<Buffer>& json) override {
return DoParse(*this, json);
@@ -801,14 +821,14 @@ class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
return HandlerBase::Bool(value);
}
- bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+ bool RawNumber(const char* data, rj::SizeType size, ...) {
if (Skipping()) {
return true;
}
return HandlerBase::RawNumber(data, size);
}
- bool String(const char* data, rapidjson::SizeType size, ...) {
+ bool String(const char* data, rj::SizeType size, ...) {
if (Skipping()) {
return true;
}
@@ -826,7 +846,7 @@ class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
/// \ingroup rapidjson-handler-interface
///
/// if an unexpected field is encountered, skip until its value has been consumed
- bool Key(const char* key, rapidjson::SizeType len, ...) {
+ bool Key(const char* key, rj::SizeType len, ...) {
MaybeStopSkipping();
if (Skipping()) {
return true;
@@ -854,7 +874,7 @@ class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
return HandlerBase::StartArray();
}
- bool EndArray(rapidjson::SizeType size) {
+ bool EndArray(rj::SizeType size) {
if (Skipping()) {
return true;
}
@@ -877,8 +897,7 @@ class Handler<UnexpectedFieldBehavior::Ignore> : public HandlerBase {
template <>
class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
public:
- Handler(MemoryPool* pool, const std::shared_ptr<Buffer>& scalar_storage)
- : HandlerBase(pool, scalar_storage) {}
+ using HandlerBase::HandlerBase;
Status Parse(const std::shared_ptr<Buffer>& json) override {
return DoParse(*this, json);
@@ -891,14 +910,14 @@ class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
return HandlerBase::Bool(value);
}
- bool RawNumber(const char* data, rapidjson::SizeType size, ...) {
+ bool RawNumber(const char* data, rj::SizeType size, ...) {
if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kNumber>())) {
return false;
}
return HandlerBase::RawNumber(data, size);
}
- bool String(const char* data, rapidjson::SizeType size, ...) {
+ bool String(const char* data, rj::SizeType size, ...) {
if (ARROW_PREDICT_FALSE(MaybePromoteFromNull<Kind::kString>())) {
return false;
}
@@ -918,7 +937,7 @@ class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
/// the current parent builder. It is added as a NullBuilder with
/// (parent.length - 1) leading nulls. The next value parsed
/// will probably trigger promotion of this field from null
- bool Key(const char* key, rapidjson::SizeType len, ...) {
+ bool Key(const char* key, rj::SizeType len, ...) {
if (ARROW_PREDICT_TRUE(SetFieldBuilder(string_view(key, len)))) {
return true;
}
@@ -967,41 +986,30 @@ class Handler<UnexpectedFieldBehavior::InferType> : public HandlerBase {
}
};
-BlockParser::BlockParser(MemoryPool* pool, ParseOptions options,
- const std::shared_ptr<Buffer>& scalar_storage)
- : pool_(pool), options_(options) {
- DCHECK(options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType ||
- options_.explicit_schema != nullptr);
- switch (options_.unexpected_field_behavior) {
+Status BlockParser::Make(MemoryPool* pool, const ParseOptions& options,
+ std::unique_ptr<BlockParser>* out) {
+ DCHECK(options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType ||
+ options.explicit_schema != nullptr);
+
+ switch (options.unexpected_field_behavior) {
case UnexpectedFieldBehavior::Ignore: {
- auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::Ignore>>(
- pool_, scalar_storage);
- // FIXME(bkietz) move this to an Initialize()
- ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
- impl_ = std::move(handler);
+ *out = make_unique<Handler<UnexpectedFieldBehavior::Ignore>>(pool);
break;
}
case UnexpectedFieldBehavior::Error: {
- auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::Error>>(
- pool_, scalar_storage);
- ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
- impl_ = std::move(handler);
+ *out = make_unique<Handler<UnexpectedFieldBehavior::Error>>(pool);
break;
}
case UnexpectedFieldBehavior::InferType:
- auto handler = internal::make_unique<Handler<UnexpectedFieldBehavior::InferType>>(
- pool_, scalar_storage);
- if (options.explicit_schema) {
- ARROW_IGNORE_EXPR(handler->SetSchema(*options_.explicit_schema));
- }
- impl_ = std::move(handler);
+ *out = make_unique<Handler<UnexpectedFieldBehavior::InferType>>(pool);
break;
}
+ return static_cast<HandlerBase&>(**out).Initialize(options.explicit_schema);
}
-BlockParser::BlockParser(ParseOptions options,
- const std::shared_ptr<Buffer>& scalar_storage)
- : BlockParser(default_memory_pool(), options, scalar_storage) {}
+Status BlockParser::Make(const ParseOptions& options, std::unique_ptr<BlockParser>* out) {
+ return BlockParser::Make(default_memory_pool(), options, out);
+}
} // namespace json
} // namespace arrow
diff --git a/cpp/src/arrow/json/parser.h b/cpp/src/arrow/json/parser.h
index c001598..22d4af9 100644
--- a/cpp/src/arrow/json/parser.h
+++ b/cpp/src/arrow/json/parser.h
@@ -15,114 +15,71 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef ARROW_JSON_PARSER_H
-#define ARROW_JSON_PARSER_H
+#pragma once
-#include <cstdint>
#include <memory>
#include <string>
-#include <unordered_map>
-#include "arrow/builder.h"
#include "arrow/json/options.h"
-#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/util/macros.h"
-#include "arrow/util/string_view.h"
#include "arrow/util/visibility.h"
-#include "arrow/visitor_inline.h"
namespace arrow {
+class Array;
+class Buffer;
class MemoryPool;
-class RecordBatch;
+class KeyValueMetadata;
+class ResizableBuffer;
namespace json {
struct Kind {
enum type : uint8_t { kNull, kBoolean, kNumber, kString, kArray, kObject };
-};
-inline static const std::shared_ptr<const KeyValueMetadata>& Tag(Kind::type k) {
- static std::shared_ptr<const KeyValueMetadata> tags[] = {
- key_value_metadata({{"json_kind", "null"}}),
- key_value_metadata({{"json_kind", "boolean"}}),
- key_value_metadata({{"json_kind", "number"}}),
- key_value_metadata({{"json_kind", "string"}}),
- key_value_metadata({{"json_kind", "array"}}),
- key_value_metadata({{"json_kind", "object"}})};
- return tags[static_cast<uint8_t>(k)];
-}
-
-inline static Status KindForType(const DataType& type, Kind::type* kind) {
- struct {
- Status Visit(const NullType&) { return SetKind(Kind::kNull); }
- Status Visit(const BooleanType&) { return SetKind(Kind::kBoolean); }
- Status Visit(const Number&) { return SetKind(Kind::kNumber); }
- // XXX should TimeType & DateType be Kind::kNumber or Kind::kString?
- Status Visit(const TimeType&) { return SetKind(Kind::kNumber); }
- Status Visit(const DateType&) { return SetKind(Kind::kNumber); }
- Status Visit(const BinaryType&) { return SetKind(Kind::kString); }
- // XXX should Decimal128Type be Kind::kNumber or Kind::kString?
- Status Visit(const FixedSizeBinaryType&) { return SetKind(Kind::kString); }
- Status Visit(const DictionaryType& dict_type) {
- return KindForType(*dict_type.dictionary()->type(), kind_);
- }
- Status Visit(const ListType&) { return SetKind(Kind::kArray); }
- Status Visit(const StructType&) { return SetKind(Kind::kObject); }
- Status Visit(const DataType& not_impl) {
- return Status::NotImplemented("JSON parsing of ", not_impl);
- }
- Status SetKind(Kind::type kind) {
- *kind_ = kind;
- return Status::OK();
- }
- Kind::type* kind_;
- } visitor = {kind};
- return VisitTypeInline(type, &visitor);
-}
+ static const std::string& Name(Kind::type);
+
+ static const std::shared_ptr<const KeyValueMetadata>& Tag(Kind::type);
+
+ static Kind::type FromTag(const std::shared_ptr<const KeyValueMetadata>& tag);
+
+ static Status ForType(const DataType& type, Kind::type* kind);
+};
constexpr int32_t kMaxParserNumRows = 100000;
/// \class BlockParser
/// \brief A reusable block-based parser for JSON data
///
-/// The parser takes a block of newline delimited JSON data and extracts
-/// keys and value pairs, inserting into provided ArrayBuilders.
-/// Parsed data is own by the
-/// parser, so the original buffer can be discarded after Parse() returns.
+/// The parser takes a block of newline delimited JSON data and extracts Arrays
+/// of unconverted strings which can be fed to a Converter to obtain a usable Array.
class ARROW_EXPORT BlockParser {
public:
- BlockParser(MemoryPool* pool, ParseOptions options,
- const std::shared_ptr<Buffer>& scalar_storage);
- BlockParser(ParseOptions options, const std::shared_ptr<Buffer>& scalar_storage);
+ virtual ~BlockParser() = default;
- /// \brief Parse a block of data insitu (destructively)
- /// \warning The input must be null terminated
- Status Parse(const std::shared_ptr<Buffer>& json) { return impl_->Parse(json); }
+ /// \brief Parse a block of data
+ virtual Status Parse(const std::shared_ptr<Buffer>& json) = 0;
/// \brief Extract parsed data
- Status Finish(std::shared_ptr<Array>* parsed) { return impl_->Finish(parsed); }
+ virtual Status Finish(std::shared_ptr<Array>* parsed) = 0;
/// \brief Return the number of parsed rows
- int32_t num_rows() const { return impl_->num_rows(); }
+ int32_t num_rows() const { return num_rows_; }
+
+ static Status Make(MemoryPool* pool, const ParseOptions& options,
+ std::unique_ptr<BlockParser>* out);
- struct Impl {
- virtual ~Impl() = default;
- virtual Status Parse(const std::shared_ptr<Buffer>& json) = 0;
- virtual Status Finish(std::shared_ptr<Array>* parsed) = 0;
- virtual int32_t num_rows() = 0;
- };
+ static Status Make(const ParseOptions& options, std::unique_ptr<BlockParser>* out);
protected:
ARROW_DISALLOW_COPY_AND_ASSIGN(BlockParser);
+ explicit BlockParser(MemoryPool* pool) : pool_(pool) {}
+
MemoryPool* pool_;
- const ParseOptions options_;
- std::unique_ptr<Impl> impl_;
+ int32_t num_rows_ = 0;
};
} // namespace json
} // namespace arrow
-
-#endif // ARROW_JSON_PARSER_H
diff --git a/cpp/src/arrow/json/api.h b/cpp/src/arrow/json/rapidjson-defs.h
similarity index 51%
copy from cpp/src/arrow/json/api.h
copy to cpp/src/arrow/json/rapidjson-defs.h
index 00fbc2e..68dd0be 100644
--- a/cpp/src/arrow/json/api.h
+++ b/cpp/src/arrow/json/rapidjson-defs.h
@@ -15,10 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef ARROW_JSON_API_H
-#define ARROW_JSON_API_H
+// Include this file before including any RapidJSON headers.
-#include "arrow/json/options.h"
-#include "arrow/json/reader.h"
+#define RAPIDJSON_HAS_STDSTRING 1
+#define RAPIDJSON_HAS_CXX11_RVALUE_REFS 1
+#define RAPIDJSON_HAS_CXX11_RANGE_FOR 1
-#endif // ARROW_JSON_API_H
+// rapidjson will be defined in namespace arrow::rapidjson
+#define RAPIDJSON_NAMESPACE arrow::rapidjson
+#define RAPIDJSON_NAMESPACE_BEGIN \
+ namespace arrow { \
+ namespace rapidjson {
+#define RAPIDJSON_NAMESPACE_END \
+ } \
+ }
+
+#include "arrow/util/sse-util.h"
+
+// enable SIMD whitespace skipping, if available
+#if defined(ARROW_HAVE_SSE2)
+#define RAPIDJSON_SSE2 1
+#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
+#endif
+
+#if defined(ARROW_HAVE_SSE4_2)
+#define RAPIDJSON_SSE42 1
+#define ARROW_RAPIDJSON_SKIP_WHITESPACE_SIMD 1
+#endif
diff --git a/cpp/src/arrow/json/reader.cc b/cpp/src/arrow/json/reader.cc
index 0e147ab..ebd5488 100644
--- a/cpp/src/arrow/json/reader.cc
+++ b/cpp/src/arrow/json/reader.cc
@@ -18,44 +18,23 @@
#include "arrow/json/reader.h"
#include <unordered_map>
+#include <utility>
+#include <vector>
#include "arrow/array.h"
#include "arrow/builder.h"
+#include "arrow/json/parser.h"
+#include "arrow/table.h"
#include "arrow/type_traits.h"
#include "arrow/util/logging.h"
#include "arrow/util/parsing.h"
+#include "arrow/visitor_inline.h"
namespace arrow {
namespace json {
using internal::StringConverter;
-Kind::type KindFromTag(const std::shared_ptr<const KeyValueMetadata>& tag) {
- std::string kind_name = tag->value(0);
- switch (kind_name[0]) {
- case 'n':
- if (kind_name[2] == 'l') {
- return Kind::kNull;
- } else {
- return Kind::kNumber;
- }
- case 'b':
- return Kind::kBoolean;
- case 's':
- return Kind::kString;
- case 'o':
- return Kind::kObject;
- case 'a':
- return Kind::kArray;
- default:
- ARROW_LOG(FATAL);
- return Kind::kNull;
- }
-}
-
-static Status Convert(const std::shared_ptr<DataType>& out_type,
- std::shared_ptr<Array> in, std::shared_ptr<Array>* out);
-
struct ConvertImpl {
Status Visit(const NullType&) {
*out = in;
@@ -177,8 +156,8 @@ struct ConvertImpl {
std::shared_ptr<Array>* out;
};
-static Status Convert(const std::shared_ptr<DataType>& out_type,
- std::shared_ptr<Array> in, std::shared_ptr<Array>* out) {
+Status Convert(const std::shared_ptr<DataType>& out_type,
+ const std::shared_ptr<Array>& in, std::shared_ptr<Array>* out) {
ConvertImpl visitor = {out_type, in, out};
return VisitTypeInline(*out_type, &visitor);
}
@@ -187,7 +166,7 @@ static Status InferAndConvert(std::shared_ptr<DataType> expected,
const std::shared_ptr<const KeyValueMetadata>& tag,
const std::shared_ptr<Array>& in,
std::shared_ptr<Array>* out) {
- Kind::type kind = KindFromTag(tag);
+ Kind::type kind = Kind::FromTag(tag);
switch (kind) {
case Kind::kObject: {
// FIXME(bkietz) in general expected fields may not be an exact prefix of parsed's
@@ -269,18 +248,20 @@ static Status InferAndConvert(std::shared_ptr<DataType> expected,
Status ParseOne(ParseOptions options, std::shared_ptr<Buffer> json,
std::shared_ptr<RecordBatch>* out) {
- BlockParser parser(default_memory_pool(), options, json);
- RETURN_NOT_OK(parser.Parse(json));
+ std::unique_ptr<BlockParser> parser;
+ RETURN_NOT_OK(BlockParser::Make(options, &parser));
+ RETURN_NOT_OK(parser->Parse(json));
std::shared_ptr<Array> parsed;
- RETURN_NOT_OK(parser.Finish(&parsed));
+ RETURN_NOT_OK(parser->Finish(&parsed));
std::shared_ptr<Array> converted;
auto schm = options.explicit_schema;
if (options.unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
if (schm) {
- RETURN_NOT_OK(InferAndConvert(struct_(schm->fields()), Tag(Kind::kObject), parsed,
- &converted));
+ RETURN_NOT_OK(InferAndConvert(struct_(schm->fields()), Kind::Tag(Kind::kObject),
+ parsed, &converted));
} else {
- RETURN_NOT_OK(InferAndConvert(nullptr, Tag(Kind::kObject), parsed, &converted));
+ RETURN_NOT_OK(
+ InferAndConvert(nullptr, Kind::Tag(Kind::kObject), parsed, &converted));
}
schm = schema(converted->type()->children());
} else {
diff --git a/cpp/src/arrow/json/reader.h b/cpp/src/arrow/json/reader.h
index 6463793..51a3473 100644
--- a/cpp/src/arrow/json/reader.h
+++ b/cpp/src/arrow/json/reader.h
@@ -15,23 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef ARROW_JSON_READER_H
-#define ARROW_JSON_READER_H
+#pragma once
#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-#include "arrow/json/options.h" // IWYU pragma: keep
-#include "arrow/json/parser.h" // IWYU pragma: keep
+#include "arrow/json/options.h"
#include "arrow/status.h"
+#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
namespace arrow {
+class Buffer;
class MemoryPool;
class Table;
+class RecordBatch;
+class Array;
+class DataType;
namespace io {
class InputStream;
@@ -45,7 +45,6 @@ class ARROW_EXPORT TableReader {
virtual Status Read(std::shared_ptr<Table>* out) = 0;
- // XXX pass optional schema?
static Status Make(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
const ReadOptions&, const ParseOptions&,
std::shared_ptr<TableReader>* out);
@@ -54,7 +53,10 @@ class ARROW_EXPORT TableReader {
ARROW_EXPORT Status ParseOne(ParseOptions options, std::shared_ptr<Buffer> json,
std::shared_ptr<RecordBatch>* out);
+/// \brief convert an Array produced by BlockParser into an Array of out_type
+ARROW_EXPORT Status Convert(const std::shared_ptr<DataType>& out_type,
+ const std::shared_ptr<Array>& in,
+ std::shared_ptr<Array>* out);
+
} // namespace json
} // namespace arrow
-
-#endif // ARROW_JSON_READER_H
diff --git a/cpp/src/arrow/json/test-common.h b/cpp/src/arrow/json/test-common.h
index 0e7f6e3..16f43eb 100644
--- a/cpp/src/arrow/json/test-common.h
+++ b/cpp/src/arrow/json/test-common.h
@@ -15,36 +15,47 @@
// specific language governing permissions and limitations
// under the License.
-#include <algorithm>
#include <memory>
#include <random>
#include <string>
#include <vector>
-#include <rapidjson/writer.h>
+#include "arrow/json/rapidjson-defs.h"
+#include "rapidjson/document.h"
+#include "rapidjson/prettywriter.h"
+#include "rapidjson/reader.h"
+#include "rapidjson/writer.h"
+#include "arrow/io/memory.h"
+#include "arrow/json/options.h"
+#include "arrow/json/parser.h"
#include "arrow/testing/gtest_util.h"
-#include "arrow/testing/util.h"
+#include "arrow/type.h"
#include "arrow/util/string_view.h"
#include "arrow/visitor_inline.h"
namespace arrow {
namespace json {
-using rapidjson::StringBuffer;
-using Writer = rapidjson::Writer<StringBuffer>;
+namespace rj = arrow::rapidjson;
+
+using rj::StringBuffer;
+using util::string_view;
+using Writer = rj::Writer<StringBuffer>;
inline static Status OK(bool ok) { return ok ? Status::OK() : Status::Invalid(""); }
template <typename Engine>
-static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer* writer);
+inline static Status Generate(const std::shared_ptr<DataType>& type, Engine& e,
+ Writer* writer);
template <typename Engine>
-static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine& e,
- Writer* writer);
+inline static Status Generate(const std::vector<std::shared_ptr<Field>>& fields,
+ Engine& e, Writer* writer);
template <typename Engine>
-static Status Generate(const std::shared_ptr<Schema>& schm, Engine& e, Writer* writer) {
+inline static Status Generate(const std::shared_ptr<Schema>& schm, Engine& e,
+ Writer* writer) {
return Generate(schm->fields(), e, writer);
}
@@ -89,16 +100,17 @@ struct GenerateImpl {
Status Visit(const ListType& t) {
auto size = std::poisson_distribution<>{4}(e);
writer.StartArray();
- for (int i = 0; i != size; ++i) RETURN_NOT_OK(Generate(t.value_type(), e, &writer));
+ for (int i = 0; i < size; ++i) RETURN_NOT_OK(Generate(t.value_type(), e, &writer));
return OK(writer.EndArray(size));
}
Status Visit(const StructType& t) { return Generate(t.children(), e, &writer); }
Engine& e;
- rapidjson::Writer<rapidjson::StringBuffer>& writer;
+ rj::Writer<rj::StringBuffer>& writer;
};
template <typename Engine>
-static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer* writer) {
+inline static Status Generate(const std::shared_ptr<DataType>& type, Engine& e,
+ Writer* writer) {
if (std::uniform_real_distribution<>{0, 1}(e) < .2) {
// one out of 5 chance of null, anywhere
writer->Null();
@@ -109,8 +121,8 @@ static Status Generate(const std::shared_ptr<DataType>& type, Engine& e, Writer*
}
template <typename Engine>
-static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine& e,
- Writer* writer) {
+inline static Status Generate(const std::vector<std::shared_ptr<Field>>& fields,
+ Engine& e, Writer* writer) {
RETURN_NOT_OK(OK(writer->StartObject()));
for (const auto& f : fields) {
writer->Key(f->name().c_str());
@@ -119,21 +131,22 @@ static Status Generate(const std::vector<std::shared_ptr<Field>>& fields, Engine
return OK(writer->EndObject(static_cast<int>(fields.size())));
}
-Status MakeBuffer(util::string_view data, std::shared_ptr<Buffer>* out) {
- RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), data.size(), out));
- std::copy(std::begin(data), std::end(data), (*out)->mutable_data());
+inline static Status MakeStream(string_view src_str,
+ std::shared_ptr<io::InputStream>* out) {
+ auto src = std::make_shared<Buffer>(src_str);
+ *out = std::make_shared<io::BufferReader>(src);
return Status::OK();
}
// scalar values (numbers and strings) are parsed into a
// dictionary<index:int32, value:string>. This can be decoded for ease of comparison
-Status DecodeStringDictionary(const DictionaryArray& dict_array,
- std::shared_ptr<Array>* decoded) {
+inline static Status DecodeStringDictionary(const DictionaryArray& dict_array,
+ std::shared_ptr<Array>* decoded) {
const StringArray& dict = static_cast<const StringArray&>(*dict_array.dictionary());
const Int32Array& indices = static_cast<const Int32Array&>(*dict_array.indices());
StringBuilder builder;
RETURN_NOT_OK(builder.Resize(indices.length()));
- for (int64_t i = 0; i != indices.length(); ++i) {
+ for (int64_t i = 0; i < indices.length(); ++i) {
if (indices.IsNull(i)) {
builder.UnsafeAppendNull();
continue;
@@ -145,5 +158,23 @@ Status DecodeStringDictionary(const DictionaryArray& dict_array,
return builder.Finish(decoded);
}
+inline static Status ParseFromString(ParseOptions options, string_view src_str,
+ std::shared_ptr<Array>* parsed) {
+ auto src = std::make_shared<Buffer>(src_str);
+ std::unique_ptr<BlockParser> parser;
+ RETURN_NOT_OK(BlockParser::Make(options, &parser));
+ RETURN_NOT_OK(parser->Parse(src));
+ return parser->Finish(parsed);
+}
+
+std::string PrettyPrint(string_view one_line) {
+ rj::Document document;
+ document.Parse(one_line.data());
+ rj::StringBuffer sb;
+ rj::PrettyWriter<rj::StringBuffer> writer(sb);
+ document.Accept(writer);
+ return sb.GetString();
+}
+
} // namespace json
} // namespace arrow
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index a8d6214..2e9483a 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -299,69 +299,84 @@ struct is_8bit_int {
(std::is_same<UInt8Type, T>::value || std::is_same<Int8Type, T>::value);
};
-template <typename T>
-using enable_if_8bit_int = typename std::enable_if<is_8bit_int<T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_8bit_int = typename std::enable_if<is_8bit_int<T>::value, R>::type;
-template <typename T>
+template <typename T, typename R = void>
using enable_if_primitive_ctype =
- typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value>::type;
+ typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value, R>::type;
-template <typename T>
-using enable_if_date = typename std::enable_if<std::is_base_of<DateType, T>::value>::type;
-
-template <typename T, typename U = void>
+template <typename T, typename R = void>
using enable_if_integer =
- typename std::enable_if<std::is_base_of<Integer, T>::value, U>::type;
+ typename std::enable_if<std::is_base_of<Integer, T>::value, R>::type;
template <typename T>
+using is_signed_integer =
+ std::integral_constant<bool, std::is_base_of<Integer, T>::value &&
+ std::is_signed<typename T::c_type>::value>;
+
+template <typename T, typename R = void>
using enable_if_signed_integer =
- typename std::enable_if<std::is_base_of<Integer, T>::value &&
- std::is_signed<typename T::c_type>::value>::type;
+ typename std::enable_if<is_signed_integer<T>::value, R>::type;
-template <typename T>
+template <typename T, typename R = void>
using enable_if_unsigned_integer =
typename std::enable_if<std::is_base_of<Integer, T>::value &&
- std::is_unsigned<typename T::c_type>::value>::type;
+ std::is_unsigned<typename T::c_type>::value,
+ R>::type;
-template <typename T>
+template <typename T, typename R = void>
using enable_if_floating_point =
- typename std::enable_if<std::is_base_of<FloatingPoint, T>::value>::type;
+ typename std::enable_if<std::is_base_of<FloatingPoint, T>::value, R>::type;
template <typename T>
-using enable_if_time = typename std::enable_if<std::is_base_of<TimeType, T>::value>::type;
+using is_date = std::is_base_of<DateType, T>;
-template <typename T>
-using enable_if_timestamp =
- typename std::enable_if<std::is_base_of<TimestampType, T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_date = typename std::enable_if<is_date<T>::value, R>::type;
template <typename T>
-using enable_if_has_c_type = typename std::enable_if<has_c_type<T>::value>::type;
+using is_time = std::is_base_of<TimeType, T>;
-template <typename T>
-using enable_if_null = typename std::enable_if<std::is_same<NullType, T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_time = typename std::enable_if<is_time<T>::value, R>::type;
template <typename T>
+using is_timestamp = std::is_base_of<TimestampType, T>;
+
+template <typename T, typename R = void>
+using enable_if_timestamp = typename std::enable_if<is_timestamp<T>::value, R>::type;
+
+template <typename T, typename R = void>
+using enable_if_has_c_type = typename std::enable_if<has_c_type<T>::value, R>::type;
+
+template <typename T, typename R = void>
+using enable_if_null = typename std::enable_if<std::is_same<NullType, T>::value, R>::type;
+
+template <typename T, typename R = void>
using enable_if_binary =
- typename std::enable_if<std::is_base_of<BinaryType, T>::value>::type;
+ typename std::enable_if<std::is_base_of<BinaryType, T>::value, R>::type;
-template <typename T>
+template <typename T, typename R = void>
using enable_if_boolean =
- typename std::enable_if<std::is_same<BooleanType, T>::value>::type;
+ typename std::enable_if<std::is_same<BooleanType, T>::value, R>::type;
-template <typename T>
+template <typename T, typename R = void>
using enable_if_binary_like =
typename std::enable_if<std::is_base_of<BinaryType, T>::value ||
- std::is_base_of<FixedSizeBinaryType, T>::value>::type;
+ std::is_base_of<FixedSizeBinaryType, T>::value,
+ R>::type;
-template <typename T>
+template <typename T, typename R = void>
using enable_if_fixed_size_binary =
- typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value>::type;
+ typename std::enable_if<std::is_base_of<FixedSizeBinaryType, T>::value, R>::type;
-template <typename T>
-using enable_if_list = typename std::enable_if<std::is_base_of<ListType, T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_list =
+ typename std::enable_if<std::is_base_of<ListType, T>::value, R>::type;
-template <typename T>
-using enable_if_number = typename std::enable_if<is_number<T>::value>::type;
+template <typename T, typename R = void>
+using enable_if_number = typename std::enable_if<is_number<T>::value, R>::type;
namespace detail {
diff --git a/cpp/src/arrow/util/parsing.h b/cpp/src/arrow/util/parsing.h
index 9bd1023..dd3c074 100644
--- a/cpp/src/arrow/util/parsing.h
+++ b/cpp/src/arrow/util/parsing.h
@@ -55,6 +55,8 @@ class StringConverter;
template <>
class StringConverter<BooleanType> {
public:
+ explicit StringConverter(const std::shared_ptr<DataType>& = NULLPTR) {}
+
using value_type = bool;
bool operator()(const char* s, size_t length, value_type* out) {
@@ -97,7 +99,7 @@ class StringToFloatConverterMixin {
public:
using value_type = typename ARROW_TYPE::c_type;
- StringToFloatConverterMixin()
+ explicit StringToFloatConverterMixin(const std::shared_ptr<DataType>& = NULLPTR)
: main_converter_(flags_, main_junk_value_, main_junk_value_, "inf", "nan"),
fallback_converter_(flags_, fallback_junk_value_, fallback_junk_value_, "inf",
"nan") {}
@@ -148,10 +150,14 @@ class StringToFloatConverterMixin {
};
template <>
-class StringConverter<FloatType> : public StringToFloatConverterMixin<FloatType> {};
+class StringConverter<FloatType> : public StringToFloatConverterMixin<FloatType> {
+ using StringToFloatConverterMixin<FloatType>::StringToFloatConverterMixin;
+};
template <>
-class StringConverter<DoubleType> : public StringToFloatConverterMixin<DoubleType> {};
+class StringConverter<DoubleType> : public StringToFloatConverterMixin<DoubleType> {
+ using StringToFloatConverterMixin<DoubleType>::StringToFloatConverterMixin;
+};
// NOTE: HalfFloatType would require a half<->float conversion library
@@ -277,6 +283,9 @@ class StringToUnsignedIntConverterMixin {
public:
using value_type = typename ARROW_TYPE::c_type;
+ explicit StringToUnsignedIntConverterMixin(const std::shared_ptr<DataType>& = NULLPTR) {
+ }
+
bool operator()(const char* s, size_t length, value_type* out) {
if (ARROW_PREDICT_FALSE(length == 0)) {
return false;
@@ -291,18 +300,23 @@ class StringToUnsignedIntConverterMixin {
};
template <>
-class StringConverter<UInt8Type> : public StringToUnsignedIntConverterMixin<UInt8Type> {};
+class StringConverter<UInt8Type> : public StringToUnsignedIntConverterMixin<UInt8Type> {
+ using StringToUnsignedIntConverterMixin<UInt8Type>::StringToUnsignedIntConverterMixin;
+};
template <>
class StringConverter<UInt16Type> : public StringToUnsignedIntConverterMixin<UInt16Type> {
+ using StringToUnsignedIntConverterMixin<UInt16Type>::StringToUnsignedIntConverterMixin;
};
template <>
class StringConverter<UInt32Type> : public StringToUnsignedIntConverterMixin<UInt32Type> {
+ using StringToUnsignedIntConverterMixin<UInt32Type>::StringToUnsignedIntConverterMixin;
};
template <>
class StringConverter<UInt64Type> : public StringToUnsignedIntConverterMixin<UInt64Type> {
+ using StringToUnsignedIntConverterMixin<UInt64Type>::StringToUnsignedIntConverterMixin;
};
template <class ARROW_TYPE>
@@ -311,6 +325,8 @@ class StringToSignedIntConverterMixin {
using value_type = typename ARROW_TYPE::c_type;
using unsigned_type = typename std::make_unsigned<value_type>::type;
+ explicit StringToSignedIntConverterMixin(const std::shared_ptr<DataType>& = NULLPTR) {}
+
bool operator()(const char* s, size_t length, value_type* out) {
static constexpr unsigned_type max_positive =
static_cast<unsigned_type>(std::numeric_limits<value_type>::max());
@@ -356,16 +372,24 @@ class StringToSignedIntConverterMixin {
};
template <>
-class StringConverter<Int8Type> : public StringToSignedIntConverterMixin<Int8Type> {};
+class StringConverter<Int8Type> : public StringToSignedIntConverterMixin<Int8Type> {
+ using StringToSignedIntConverterMixin<Int8Type>::StringToSignedIntConverterMixin;
+};
template <>
-class StringConverter<Int16Type> : public StringToSignedIntConverterMixin<Int16Type> {};
+class StringConverter<Int16Type> : public StringToSignedIntConverterMixin<Int16Type> {
+ using StringToSignedIntConverterMixin<Int16Type>::StringToSignedIntConverterMixin;
+};
template <>
-class StringConverter<Int32Type> : public StringToSignedIntConverterMixin<Int32Type> {};
+class StringConverter<Int32Type> : public StringToSignedIntConverterMixin<Int32Type> {
+ using StringToSignedIntConverterMixin<Int32Type>::StringToSignedIntConverterMixin;
+};
template <>
-class StringConverter<Int64Type> : public StringToSignedIntConverterMixin<Int64Type> {};
+class StringConverter<Int64Type> : public StringToSignedIntConverterMixin<Int64Type> {
+ using StringToSignedIntConverterMixin<Int64Type>::StringToSignedIntConverterMixin;
+};
template <>
class StringConverter<TimestampType> {
diff --git a/cpp/src/arrow/util/sse-util.h b/cpp/src/arrow/util/sse-util.h
index 15e7c99..b470b41 100644
--- a/cpp/src/arrow/util/sse-util.h
+++ b/cpp/src/arrow/util/sse-util.h
@@ -18,11 +18,9 @@
// From Apache Impala as of 2016-01-29. Pared down to a minimal set of
// functions needed for parquet-cpp
-#ifndef ARROW_UTIL_SSE_UTIL_H
-#define ARROW_UTIL_SSE_UTIL_H
+#pragma once
-#undef ARROW_HAVE_SSE2
-#undef ARROW_HAVE_SSE4_2
+#include "arrow/util/macros.h"
#ifdef ARROW_USE_SIMD
@@ -36,7 +34,7 @@
// gcc/clang (possibly others)
-#if defined(__SSE4_2__)
+#if defined(__SSE2__)
#define ARROW_HAVE_SSE2 1
#include <emmintrin.h>
#endif
@@ -46,7 +44,9 @@
#include <nmmintrin.h>
#endif
-#endif
+#endif // ARROW_USE_SIMD
+
+// MSVC x86-64
namespace arrow {
@@ -155,5 +155,3 @@ static inline uint32_t SSE4_crc32_u64(uint32_t, uint64_t) {
#endif // ARROW_HAVE_SSE4_2
} // namespace arrow
-
-#endif // ARROW_UTIL_SSE_UTIL_H