You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/17 23:41:57 UTC

[GitHub] [arrow] wjones127 commented on a diff in pull request #14355: ARROW-17932: [C++] Implement streaming RecordBatchReader for JSON

wjones127 commented on code in PR #14355:
URL: https://github.com/apache/arrow/pull/14355#discussion_r1025816181


##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,447 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; }
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<RecordBatch>> ToRecordBatch(const StructArray& converted) {

Review Comment:
   There exists `RecordBatch::FromStructArray()`. Could you use that instead?



##########
cpp/src/arrow/json/reader.h:
##########
@@ -60,5 +50,49 @@ class ARROW_EXPORT TableReader {
 ARROW_EXPORT Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options,
                                                            std::shared_ptr<Buffer> json);
 
+/// \brief A class that reads a JSON file incrementally
+///
+/// JSON data is read from a stream in fixed-size blocks (configurable with
+/// `ReadOptions::block_size`). Each block is converted to a `RecordBatch`. Yielded
+/// batches have a consistent schema but may differ in row count.
+///
+/// The supplied `ParseOptions` are used to determine a schema on the first non-empty
+/// block. Afterwards, the schema is frozen and unexpected fields will be ignored on
+/// subsequent reads (unless `UnexpectedFieldBehavior::Error` was specified).

Review Comment:
   Would we consider allowing a `schema` to be passed? I could see that being preferable in many cases than just taking what is see in the first block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org