You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/30 18:44:08 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #14355: ARROW-17932: [C++] Implement streaming RecordBatchReader for JSON

pitrou commented on code in PR #14355:
URL: https://github.com/apache/arrow/pull/14355#discussion_r1036151325


##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,435 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; }
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block,
+                                          const ParseOptions& parse_options,
+                                          MemoryPool* pool, int64_t* out_size = nullptr) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser));
+
+  int64_t size = block.partial->size() + block.completion->size() + block.whole->size();
+  RETURN_NOT_OK(parser->ReserveScalarStorage(size));
+
+  if (block.partial->size() || block.completion->size()) {
+    std::shared_ptr<Buffer> straddling;
+    if (!block.completion->size()) {
+      straddling = block.partial;
+    } else if (!block.partial->size()) {
+      straddling = block.completion;
+    } else {
+      ARROW_ASSIGN_OR_RAISE(straddling,
+                            ConcatenateBuffers({block.partial, block.completion}, pool));
+    }
+    RETURN_NOT_OK(parser->Parse(straddling));
+  }
+  if (block.whole->size()) {
+    RETURN_NOT_OK(parser->Parse(block.whole));
+  }
+
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  if (out_size) *out_size = size;
+
+  return parsed;
+}
+
+class ChunkingTransformer {
+ public:
+  explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker)
+      : chunker_(std::move(chunker)) {}
+
+  template <typename... Args>
+  static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... args) {
+    return [self = std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)](
+               std::shared_ptr<Buffer> buffer) { return (*self)(std::move(buffer)); };
+  }
+
+ private:
+  Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
+    if (!buffer_) {
+      if (ARROW_PREDICT_TRUE(!next_buffer)) {
+        partial_ = nullptr;

Review Comment:
   This is probably harmless, but is there a situation where `partial_` would not already be null here?
   If `buffer_` is null now, it means that on the previous call `next_buffer` was null and therefore `next_partial` had remained null as well.
   So perhaps instead make this:
   ```suggestion
           DCHECK_EQ(partial_, nullptr) << "Logic error: non-null partial with null buffer";
   ```



##########
cpp/src/arrow/json/reader.h:
##########
@@ -19,25 +19,15 @@
 
 #include <memory>
 
+#include "arrow/io/interfaces.h"

Review Comment:
   The forward decls should be enough here?
   ```suggestion
   #include "arrow/io/type_fwd.h"
   ```



##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,435 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; }
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block,
+                                          const ParseOptions& parse_options,
+                                          MemoryPool* pool, int64_t* out_size = nullptr) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser));
+
+  int64_t size = block.partial->size() + block.completion->size() + block.whole->size();
+  RETURN_NOT_OK(parser->ReserveScalarStorage(size));
+
+  if (block.partial->size() || block.completion->size()) {
+    std::shared_ptr<Buffer> straddling;
+    if (!block.completion->size()) {
+      straddling = block.partial;
+    } else if (!block.partial->size()) {
+      straddling = block.completion;
+    } else {
+      ARROW_ASSIGN_OR_RAISE(straddling,
+                            ConcatenateBuffers({block.partial, block.completion}, pool));
+    }
+    RETURN_NOT_OK(parser->Parse(straddling));
+  }
+  if (block.whole->size()) {
+    RETURN_NOT_OK(parser->Parse(block.whole));
+  }
+
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  if (out_size) *out_size = size;
+
+  return parsed;
+}
+
+class ChunkingTransformer {
+ public:
+  explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker)
+      : chunker_(std::move(chunker)) {}
+
+  template <typename... Args>
+  static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... args) {
+    return [self = std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)](
+               std::shared_ptr<Buffer> buffer) { return (*self)(std::move(buffer)); };
+  }
+
+ private:
+  Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
+    if (!buffer_) {
+      if (ARROW_PREDICT_TRUE(!next_buffer)) {
+        partial_ = nullptr;
+        return TransformFinish();
+      }
+      partial_ = std::make_shared<Buffer>("");
+      buffer_ = std::move(next_buffer);
+      return TransformSkip();
+    }
+    DCHECK_NE(partial_, nullptr);
+
+    std::shared_ptr<Buffer> whole, completion, next_partial;
+    if (!next_buffer) {
+      // End of file reached => compute completion from penultimate block
+      RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &whole));
+    } else {
+      std::shared_ptr<Buffer> starts_with_whole;
+      // Get completion of partial from previous block.
+      RETURN_NOT_OK(chunker_->ProcessWithPartial(partial_, buffer_, &completion,
+                                                 &starts_with_whole));
+      // Get all whole objects entirely inside the current buffer
+      RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
+    }
+
+    buffer_ = std::move(next_buffer);
+    return TransformYield(ChunkedBlock{std::exchange(partial_, next_partial),
+                                       std::move(completion), std::move(whole),
+                                       index_++});
+  }
+
+  std::unique_ptr<Chunker> chunker_;
+  std::shared_ptr<Buffer> partial_;
+  std::shared_ptr<Buffer> buffer_;
+  int64_t index_ = 0;
+};
+
+template <typename... Args>
+Iterator<ChunkedBlock> MakeChunkingIterator(Iterator<std::shared_ptr<Buffer>> source,
+                                            Args&&... args) {
+  return MakeTransformedIterator(std::move(source),
+                                 ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
+
+template <typename... Args>
+AsyncGenerator<ChunkedBlock> MakeChunkingGenerator(
+    AsyncGenerator<std::shared_ptr<Buffer>> source, Args&&... args) {
+  return MakeTransformedGenerator(std::move(source),
+                                  ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
 
 class TableReaderImpl : public TableReader,
                         public std::enable_shared_from_this<TableReaderImpl> {
  public:
   TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
                   const ParseOptions& parse_options,
                   std::shared_ptr<TaskGroup> task_group)
-      : pool_(pool),
+      : decode_context_(parse_options, pool),
         read_options_(read_options),
-        parse_options_(parse_options),
-        chunker_(MakeChunker(parse_options_)),
         task_group_(std::move(task_group)) {}
 
   Status Init(std::shared_ptr<io::InputStream> input) {
     ARROW_ASSIGN_OR_RAISE(auto it,
                           io::MakeInputStreamIterator(input, read_options_.block_size));
     return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
-        .Value(&block_iterator_);
+        .Value(&buffer_iterator_);
   }
 
   Result<std::shared_ptr<Table>> Read() override {
-    RETURN_NOT_OK(MakeBuilder());
-
-    ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
-    if (block == nullptr) {
+    auto block_it = MakeChunkingIterator(std::move(buffer_iterator_),
+                                         MakeChunker(decode_context_.parse_options()));
+
+    bool did_read = false;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(auto block, block_it.Next());
+      if (IsIterationEnd(block)) break;
+      if (!did_read) {
+        did_read = true;
+        RETURN_NOT_OK(MakeBuilder());
+      }
+      task_group_->Append(
+          [self = shared_from_this(), block] { return self->ParseAndInsert(block); });
+    }
+    if (!did_read) {
       return Status::Invalid("Empty JSON file");
     }
 
-    auto self = shared_from_this();
-    auto empty = std::make_shared<Buffer>("");
+    std::shared_ptr<ChunkedArray> array;
+    RETURN_NOT_OK(builder_->Finish(&array));
+    return Table::FromChunkedStructArray(array);
+  }
 
-    int64_t block_index = 0;
-    std::shared_ptr<Buffer> partial = empty;
+ private:
+  Status MakeBuilder() {
+    return MakeChunkedArrayBuilder(task_group_, decode_context_.pool(),
+                                   decode_context_.promotion_graph(),
+                                   decode_context_.conversion_type(), &builder_);
+  }
 
-    while (block != nullptr) {
-      std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
+  Status ParseAndInsert(const ChunkedBlock& block) {
+    ARROW_ASSIGN_OR_RAISE(auto parsed, ParseBlock(block, decode_context_.parse_options(),
+                                                  decode_context_.pool()));
+    builder_->Insert(block.index, field("", parsed->type()), parsed);
+    return Status::OK();
+  }
 
-      ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
+  DecodeContext decode_context_;
+  ReadOptions read_options_;
+  std::shared_ptr<TaskGroup> task_group_;
+  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  std::shared_ptr<ChunkedArrayBuilder> builder_;
+};
 
-      if (next_block == nullptr) {
-        // End of file reached => compute completion from penultimate block
-        RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
-      } else {
-        std::shared_ptr<Buffer> starts_with_whole;
-        // Get completion of partial from previous block.
-        RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
-                                                   &starts_with_whole));
+// Callable object for parsing/converting individual JSON blocks. The class itself can be
+// called concurrently but reads from the `DecodeContext` aren't synchronized
+class DecodingOperator {
+ public:
+  explicit DecodingOperator(std::shared_ptr<const DecodeContext> context)
+      : context_(std::move(context)) {}
 
-        // Get all whole objects entirely inside the current buffer
-        RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
-      }
+  Result<DecodedBlock> operator()(const ChunkedBlock& block) const {
+    int64_t num_bytes;
+    ARROW_ASSIGN_OR_RAISE(auto unconverted, ParseBlock(block, context_->parse_options(),
+                                                       context_->pool(), &num_bytes));
 
-      // Launch parse task
-      task_group_->Append([self, partial, completion, whole, block_index] {
-        return self->ParseAndInsert(partial, completion, whole, block_index);
-      });
-      block_index++;
+    std::shared_ptr<ChunkedArrayBuilder> builder;
+    RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), context_->pool(),
+                                          context_->promotion_graph(),
+                                          context_->conversion_type(), &builder));
+    builder->Insert(0, field("", unconverted->type()), unconverted);
 
-      partial = next_partial;
-      block = next_block;
-    }
+    std::shared_ptr<ChunkedArray> chunked;
+    RETURN_NOT_OK(builder->Finish(&chunked));
+    ARROW_ASSIGN_OR_RAISE(auto batch, RecordBatch::FromStructArray(chunked->chunk(0)));
 
-    std::shared_ptr<ChunkedArray> array;
-    RETURN_NOT_OK(builder_->Finish(&array));
-    return Table::FromChunkedStructArray(array);
+    return DecodedBlock{std::move(batch), num_bytes};
   }
 
  private:
-  Status MakeBuilder() {
-    auto type = parse_options_.explicit_schema
-                    ? struct_(parse_options_.explicit_schema->fields())
-                    : struct_({});
+  std::shared_ptr<const DecodeContext> context_;
+};
 
-    auto promotion_graph =
-        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
-            ? GetPromotionGraph()
-            : nullptr;
+// TODO(benibus): Replace with `MakeApplyGenerator` from
+// github.com/apache/arrow/pull/14269 if/when it gets merged
+//
+// Reads from the source and spawns fan-out decoding tasks on the given executor
+AsyncGenerator<DecodedBlock> MakeDecodingGenerator(
+    AsyncGenerator<ChunkedBlock> source,
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder,
+    Executor* executor) {
+  struct State {
+    AsyncGenerator<ChunkedBlock> source;
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder;
+    Executor* executor;
+  } state{std::move(source), std::move(decoder), executor};
+
+  return [state = std::make_shared<State>(std::move(state))] {
+    auto options = CallbackOptions::Defaults();
+    options.executor = state->executor;
+    options.should_schedule = ShouldSchedule::Always;
+
+    return state->source().Then(
+        [state](const ChunkedBlock& block) -> Result<DecodedBlock> {
+          if (IsIterationEnd(block)) {
+            return IterationEnd<DecodedBlock>();
+          } else {
+            return state->decoder(block);
+          }
+        },
+        {}, options);
+  };
+}
 
-    return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
-  }
-
-  Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
-                        const std::shared_ptr<Buffer>& completion,
-                        const std::shared_ptr<Buffer>& whole, int64_t block_index) {
-    std::unique_ptr<BlockParser> parser;
-    RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
-    RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
-                                               whole->size()));
-
-    if (partial->size() != 0 || completion->size() != 0) {
-      std::shared_ptr<Buffer> straddling;
-      if (partial->size() == 0) {
-        straddling = completion;
-      } else if (completion->size() == 0) {
-        straddling = partial;
-      } else {
-        ARROW_ASSIGN_OR_RAISE(straddling,
-                              ConcatenateBuffers({partial, completion}, pool_));
-      }
-      RETURN_NOT_OK(parser->Parse(straddling));
+class StreamingReaderImpl : public StreamingReader {
+ public:
+  StreamingReaderImpl(DecodedBlock first_block, AsyncGenerator<DecodedBlock> source,
+                      const std::shared_ptr<DecodeContext>& context, int max_readahead)
+      : first_block_(std::move(first_block)),
+        schema_(first_block_->record_batch->schema()),
+        bytes_processed_(std::make_shared<std::atomic<int64_t>>(0)) {
+    // Set the final schema for future invocations of the source generator
+    context->SetStrictSchema(schema_);
+    if (max_readahead > 0) {
+      source = MakeReadaheadGenerator(std::move(source), max_readahead);
     }
+    generator_ = MakeMappedGenerator(
+        std::move(source), [counter = bytes_processed_](const DecodedBlock& out) {
+          counter->fetch_add(out.num_bytes);
+          return out.record_batch;
+        });
+  }
 
-    if (whole->size() != 0) {
-      RETURN_NOT_OK(parser->Parse(whole));
+  static Future<std::shared_ptr<StreamingReaderImpl>> MakeAsync(
+      AsyncGenerator<ChunkedBlock> chunking_gen, std::shared_ptr<DecodeContext> context,
+      Executor* cpu_executor, bool use_threads) {
+    auto source = MakeDecodingGenerator(std::move(chunking_gen),
+                                        DecodingOperator(context), cpu_executor);
+    const int max_readahead = use_threads ? cpu_executor->GetCapacity() : 0;
+    return FirstBlock(source).Then([source = std::move(source),
+                                    context = std::move(context),
+                                    max_readahead](const DecodedBlock& block) {
+      return std::make_shared<StreamingReaderImpl>(block, std::move(source), context,
+                                                   max_readahead);
+    });
+  }
+
+  [[nodiscard]] std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+    auto result = ReadNextAsync().result();
+    return std::move(result).Value(out);
+  }
+
+  Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
+    // On the first call, return the batch we used for initialization
+    if (ARROW_PREDICT_FALSE(first_block_)) {
+      bytes_processed_->fetch_add(first_block_->num_bytes);
+      auto batch = std::exchange(first_block_, std::nullopt)->record_batch;
+      return ToFuture(std::move(batch));
     }
+    return generator_();
+  }
 
-    std::shared_ptr<Array> parsed;
-    RETURN_NOT_OK(parser->Finish(&parsed));
-    builder_->Insert(block_index, field("", parsed->type()), parsed);
-    return Status::OK();
+  [[nodiscard]] int64_t bytes_read() const override { return bytes_processed_->load(); }
+
+ private:
+  static Future<DecodedBlock> FirstBlock(AsyncGenerator<DecodedBlock> gen) {
+    // Read from the stream until we get a non-empty record batch that we can use to
+    // declare the schema. Along the way, accumulate the bytes read so they can be
+    // recorded on the first `ReadNextAsync`
+    auto out = std::make_shared<DecodedBlock>();
+    DCHECK_EQ(out->num_bytes, 0);

Review Comment:
   Hmm, this seems trivially true? :-)



##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,435 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; }
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block,
+                                          const ParseOptions& parse_options,
+                                          MemoryPool* pool, int64_t* out_size = nullptr) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser));
+
+  int64_t size = block.partial->size() + block.completion->size() + block.whole->size();
+  RETURN_NOT_OK(parser->ReserveScalarStorage(size));
+
+  if (block.partial->size() || block.completion->size()) {
+    std::shared_ptr<Buffer> straddling;
+    if (!block.completion->size()) {
+      straddling = block.partial;
+    } else if (!block.partial->size()) {
+      straddling = block.completion;
+    } else {
+      ARROW_ASSIGN_OR_RAISE(straddling,
+                            ConcatenateBuffers({block.partial, block.completion}, pool));
+    }
+    RETURN_NOT_OK(parser->Parse(straddling));
+  }
+  if (block.whole->size()) {
+    RETURN_NOT_OK(parser->Parse(block.whole));
+  }
+
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  if (out_size) *out_size = size;
+
+  return parsed;
+}
+
+class ChunkingTransformer {
+ public:
+  explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker)
+      : chunker_(std::move(chunker)) {}
+
+  template <typename... Args>
+  static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... args) {
+    return [self = std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)](
+               std::shared_ptr<Buffer> buffer) { return (*self)(std::move(buffer)); };
+  }
+
+ private:
+  Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
+    if (!buffer_) {
+      if (ARROW_PREDICT_TRUE(!next_buffer)) {
+        partial_ = nullptr;
+        return TransformFinish();
+      }
+      partial_ = std::make_shared<Buffer>("");
+      buffer_ = std::move(next_buffer);
+      return TransformSkip();
+    }
+    DCHECK_NE(partial_, nullptr);
+
+    std::shared_ptr<Buffer> whole, completion, next_partial;
+    if (!next_buffer) {
+      // End of file reached => compute completion from penultimate block
+      RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &whole));
+    } else {
+      std::shared_ptr<Buffer> starts_with_whole;
+      // Get completion of partial from previous block.
+      RETURN_NOT_OK(chunker_->ProcessWithPartial(partial_, buffer_, &completion,
+                                                 &starts_with_whole));
+      // Get all whole objects entirely inside the current buffer
+      RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
+    }
+
+    buffer_ = std::move(next_buffer);
+    return TransformYield(ChunkedBlock{std::exchange(partial_, next_partial),
+                                       std::move(completion), std::move(whole),
+                                       index_++});
+  }
+
+  std::unique_ptr<Chunker> chunker_;
+  std::shared_ptr<Buffer> partial_;
+  std::shared_ptr<Buffer> buffer_;
+  int64_t index_ = 0;
+};
+
+template <typename... Args>
+Iterator<ChunkedBlock> MakeChunkingIterator(Iterator<std::shared_ptr<Buffer>> source,
+                                            Args&&... args) {
+  return MakeTransformedIterator(std::move(source),
+                                 ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
+
+template <typename... Args>
+AsyncGenerator<ChunkedBlock> MakeChunkingGenerator(
+    AsyncGenerator<std::shared_ptr<Buffer>> source, Args&&... args) {
+  return MakeTransformedGenerator(std::move(source),
+                                  ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
 
 class TableReaderImpl : public TableReader,
                         public std::enable_shared_from_this<TableReaderImpl> {
  public:
   TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
                   const ParseOptions& parse_options,
                   std::shared_ptr<TaskGroup> task_group)
-      : pool_(pool),
+      : decode_context_(parse_options, pool),
         read_options_(read_options),
-        parse_options_(parse_options),
-        chunker_(MakeChunker(parse_options_)),
         task_group_(std::move(task_group)) {}
 
   Status Init(std::shared_ptr<io::InputStream> input) {
     ARROW_ASSIGN_OR_RAISE(auto it,
                           io::MakeInputStreamIterator(input, read_options_.block_size));
     return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
-        .Value(&block_iterator_);
+        .Value(&buffer_iterator_);
   }
 
   Result<std::shared_ptr<Table>> Read() override {
-    RETURN_NOT_OK(MakeBuilder());
-
-    ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
-    if (block == nullptr) {
+    auto block_it = MakeChunkingIterator(std::move(buffer_iterator_),
+                                         MakeChunker(decode_context_.parse_options()));
+
+    bool did_read = false;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(auto block, block_it.Next());
+      if (IsIterationEnd(block)) break;
+      if (!did_read) {
+        did_read = true;
+        RETURN_NOT_OK(MakeBuilder());
+      }
+      task_group_->Append(
+          [self = shared_from_this(), block] { return self->ParseAndInsert(block); });
+    }
+    if (!did_read) {
       return Status::Invalid("Empty JSON file");
     }
 
-    auto self = shared_from_this();
-    auto empty = std::make_shared<Buffer>("");
+    std::shared_ptr<ChunkedArray> array;
+    RETURN_NOT_OK(builder_->Finish(&array));
+    return Table::FromChunkedStructArray(array);
+  }
 
-    int64_t block_index = 0;
-    std::shared_ptr<Buffer> partial = empty;
+ private:
+  Status MakeBuilder() {
+    return MakeChunkedArrayBuilder(task_group_, decode_context_.pool(),
+                                   decode_context_.promotion_graph(),
+                                   decode_context_.conversion_type(), &builder_);
+  }
 
-    while (block != nullptr) {
-      std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
+  Status ParseAndInsert(const ChunkedBlock& block) {
+    ARROW_ASSIGN_OR_RAISE(auto parsed, ParseBlock(block, decode_context_.parse_options(),
+                                                  decode_context_.pool()));
+    builder_->Insert(block.index, field("", parsed->type()), parsed);
+    return Status::OK();
+  }
 
-      ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
+  DecodeContext decode_context_;
+  ReadOptions read_options_;
+  std::shared_ptr<TaskGroup> task_group_;
+  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  std::shared_ptr<ChunkedArrayBuilder> builder_;
+};
 
-      if (next_block == nullptr) {
-        // End of file reached => compute completion from penultimate block
-        RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
-      } else {
-        std::shared_ptr<Buffer> starts_with_whole;
-        // Get completion of partial from previous block.
-        RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
-                                                   &starts_with_whole));
+// Callable object for parsing/converting individual JSON blocks. The class itself can be
+// called concurrently but reads from the `DecodeContext` aren't synchronized
+class DecodingOperator {
+ public:
+  explicit DecodingOperator(std::shared_ptr<const DecodeContext> context)
+      : context_(std::move(context)) {}
 
-        // Get all whole objects entirely inside the current buffer
-        RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
-      }
+  Result<DecodedBlock> operator()(const ChunkedBlock& block) const {
+    int64_t num_bytes;
+    ARROW_ASSIGN_OR_RAISE(auto unconverted, ParseBlock(block, context_->parse_options(),
+                                                       context_->pool(), &num_bytes));
 
-      // Launch parse task
-      task_group_->Append([self, partial, completion, whole, block_index] {
-        return self->ParseAndInsert(partial, completion, whole, block_index);
-      });
-      block_index++;
+    std::shared_ptr<ChunkedArrayBuilder> builder;
+    RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), context_->pool(),
+                                          context_->promotion_graph(),
+                                          context_->conversion_type(), &builder));
+    builder->Insert(0, field("", unconverted->type()), unconverted);
 
-      partial = next_partial;
-      block = next_block;
-    }
+    std::shared_ptr<ChunkedArray> chunked;
+    RETURN_NOT_OK(builder->Finish(&chunked));
+    ARROW_ASSIGN_OR_RAISE(auto batch, RecordBatch::FromStructArray(chunked->chunk(0)));
 
-    std::shared_ptr<ChunkedArray> array;
-    RETURN_NOT_OK(builder_->Finish(&array));
-    return Table::FromChunkedStructArray(array);
+    return DecodedBlock{std::move(batch), num_bytes};
   }
 
  private:
-  Status MakeBuilder() {
-    auto type = parse_options_.explicit_schema
-                    ? struct_(parse_options_.explicit_schema->fields())
-                    : struct_({});
+  std::shared_ptr<const DecodeContext> context_;
+};
 
-    auto promotion_graph =
-        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
-            ? GetPromotionGraph()
-            : nullptr;
+// TODO(benibus): Replace with `MakeApplyGenerator` from
+// github.com/apache/arrow/pull/14269 if/when it gets merged
+//
+// Reads from the source and spawns fan-out decoding tasks on the given executor
+AsyncGenerator<DecodedBlock> MakeDecodingGenerator(
+    AsyncGenerator<ChunkedBlock> source,
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder,
+    Executor* executor) {
+  struct State {
+    AsyncGenerator<ChunkedBlock> source;
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder;
+    Executor* executor;
+  } state{std::move(source), std::move(decoder), executor};
+
+  return [state = std::make_shared<State>(std::move(state))] {
+    auto options = CallbackOptions::Defaults();
+    options.executor = state->executor;
+    options.should_schedule = ShouldSchedule::Always;
+
+    return state->source().Then(
+        [state](const ChunkedBlock& block) -> Result<DecodedBlock> {
+          if (IsIterationEnd(block)) {
+            return IterationEnd<DecodedBlock>();
+          } else {
+            return state->decoder(block);
+          }
+        },
+        {}, options);
+  };
+}
 
-    return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
-  }
-
-  Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
-                        const std::shared_ptr<Buffer>& completion,
-                        const std::shared_ptr<Buffer>& whole, int64_t block_index) {
-    std::unique_ptr<BlockParser> parser;
-    RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
-    RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
-                                               whole->size()));
-
-    if (partial->size() != 0 || completion->size() != 0) {
-      std::shared_ptr<Buffer> straddling;
-      if (partial->size() == 0) {
-        straddling = completion;
-      } else if (completion->size() == 0) {
-        straddling = partial;
-      } else {
-        ARROW_ASSIGN_OR_RAISE(straddling,
-                              ConcatenateBuffers({partial, completion}, pool_));
-      }
-      RETURN_NOT_OK(parser->Parse(straddling));
+class StreamingReaderImpl : public StreamingReader {
+ public:
+  StreamingReaderImpl(DecodedBlock first_block, AsyncGenerator<DecodedBlock> source,
+                      const std::shared_ptr<DecodeContext>& context, int max_readahead)
+      : first_block_(std::move(first_block)),
+        schema_(first_block_->record_batch->schema()),
+        bytes_processed_(std::make_shared<std::atomic<int64_t>>(0)) {
+    // Set the final schema for future invocations of the source generator
+    context->SetStrictSchema(schema_);
+    if (max_readahead > 0) {
+      source = MakeReadaheadGenerator(std::move(source), max_readahead);
     }
+    generator_ = MakeMappedGenerator(
+        std::move(source), [counter = bytes_processed_](const DecodedBlock& out) {
+          counter->fetch_add(out.num_bytes);
+          return out.record_batch;
+        });
+  }
 
-    if (whole->size() != 0) {
-      RETURN_NOT_OK(parser->Parse(whole));
+  static Future<std::shared_ptr<StreamingReaderImpl>> MakeAsync(
+      AsyncGenerator<ChunkedBlock> chunking_gen, std::shared_ptr<DecodeContext> context,
+      Executor* cpu_executor, bool use_threads) {
+    auto source = MakeDecodingGenerator(std::move(chunking_gen),
+                                        DecodingOperator(context), cpu_executor);
+    const int max_readahead = use_threads ? cpu_executor->GetCapacity() : 0;
+    return FirstBlock(source).Then([source = std::move(source),
+                                    context = std::move(context),
+                                    max_readahead](const DecodedBlock& block) {
+      return std::make_shared<StreamingReaderImpl>(block, std::move(source), context,
+                                                   max_readahead);
+    });
+  }
+
+  [[nodiscard]] std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+    auto result = ReadNextAsync().result();
+    return std::move(result).Value(out);
+  }
+
+  Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
+    // On the first call, return the batch we used for initialization
+    if (ARROW_PREDICT_FALSE(first_block_)) {
+      bytes_processed_->fetch_add(first_block_->num_bytes);
+      auto batch = std::exchange(first_block_, std::nullopt)->record_batch;
+      return ToFuture(std::move(batch));
     }
+    return generator_();
+  }
 
-    std::shared_ptr<Array> parsed;
-    RETURN_NOT_OK(parser->Finish(&parsed));
-    builder_->Insert(block_index, field("", parsed->type()), parsed);
-    return Status::OK();
+  [[nodiscard]] int64_t bytes_read() const override { return bytes_processed_->load(); }
+
+ private:
+  static Future<DecodedBlock> FirstBlock(AsyncGenerator<DecodedBlock> gen) {
+    // Read from the stream until we get a non-empty record batch that we can use to
+    // declare the schema. Along the way, accumulate the bytes read so they can be
+    // recorded on the first `ReadNextAsync`
+    auto out = std::make_shared<DecodedBlock>();

Review Comment:
   Can probably move this inside the `loop_body`?



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",

Review Comment:
   I might be missing something, but what is specifically bad about this chunk? Does RapidJSON not allow additional whitespace in JSON?



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  ASSERT_NE(reader->schema(), nullptr);
+  EXPECT_EQ(*reader->schema(), *test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 13);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_read(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, R"([{"b":true,"s":"foo"}])");
+
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_read(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8()), field("t", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  read_options_.block_size = 48;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":null}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 106);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, InferredSchema) {
+  auto test_json = Join(
+      {
+          R"({"a": 0, "b": "foo"       })",
+          R"({"a": 1, "c": true        })",
+          R"({"a": 2, "d": "2022-01-01"})",
+      },
+      "\n", true);
+
+  std::shared_ptr<StreamingReader> reader;
+  std::shared_ptr<Schema> expected_schema;
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+
+  FieldVector fields = {field("a", int64()), field("b", utf8())};
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+  parse_options_.explicit_schema = nullptr;
+
+  // Schema derived from the first line
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 32;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": "foo"}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 28);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 1, "b": null}])");

Review Comment:
   Hmm... is this desired? There's an additional field "c" in this block that gets ignored. That should only happen with `UnexpectedFieldBehavior::Ignore`. But with `UnexpectedFieldBehavior::InferType`, the schema should be frozen after the first block and the reader error out on subsequent unexpected fields.



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  ASSERT_NE(reader->schema(), nullptr);
+  EXPECT_EQ(*reader->schema(), *test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 13);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_read(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, R"([{"b":true,"s":"foo"}])");
+
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_read(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8()), field("t", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  read_options_.block_size = 48;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":null}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 106);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, InferredSchema) {
+  auto test_json = Join(
+      {
+          R"({"a": 0, "b": "foo"       })",
+          R"({"a": 1, "c": true        })",
+          R"({"a": 2, "d": "2022-01-01"})",
+      },
+      "\n", true);
+
+  std::shared_ptr<StreamingReader> reader;
+  std::shared_ptr<Schema> expected_schema;
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+
+  FieldVector fields = {field("a", int64()), field("b", utf8())};
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+  parse_options_.explicit_schema = nullptr;
+
+  // Schema derived from the first line
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 32;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": "foo"}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 28);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 1, "b": null}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 2, "b": null}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  // Schema derived from the first 2 lines
+  fields.push_back(field("c", boolean()));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 64;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null},
+    {"a": 1, "b":  null, "c": true}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 2, "b": null, "c": null}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  // Schema derived from all 3 lines
+  fields.push_back(field("d", timestamp(TimeUnit::SECOND)));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 96;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null, "d":  null},
+    {"a": 1, "b":  null, "c": true, "d":  null},
+    {"a": 2, "b":  null, "c": null, "d":  "2022-01-01"}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, AsyncReentrancy) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches + 1);
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  for (auto& future : futures) {
+    future = reader->ReadNextAsync();
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  EXPECT_EQ(reader->bytes_read(), expected.json_size);
+  ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(std::move(results)));
+  EXPECT_EQ(batches.back(), nullptr);
+  batches.pop_back();
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  ASSERT_TABLES_EQUAL(*expected.table, *table);
+}
+
+TEST_P(StreamingReaderTest, FuturesOutliveReader) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  auto stream = MakeTestStream(expected.json, kIoLatency);
+  std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches);
+  std::weak_ptr<StreamingReader> weak_reader;
+  {
+    ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(stream));
+    weak_reader = reader;
+    EXPECT_EQ(reader->bytes_read(), 0);
+    for (auto& future : futures) {
+      future = reader->ReadNextAsync();
+    }
+  }
+
+  auto all_future = All(std::move(futures));
+  AssertNotFinished(all_future);
+  EXPECT_EQ(weak_reader.use_count(), 0);
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, all_future);
+  ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(std::move(results)));
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  ASSERT_TABLES_EQUAL(*expected.table, *table);
+}
+
+TEST_P(StreamingReaderTest, NestedParallelism) {

Review Comment:
   Hmm... I' m not sure what this is meant to test, apart from the fact that one can spawn a task on a thread pool that will collect results from a generator running on another thread pool? This seems pretty generic and not JSON-specific at all.



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",

Review Comment:
   Ok, it took me some time to understand that it would overflow the block size :-) 



##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,435 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; }
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block,
+                                          const ParseOptions& parse_options,
+                                          MemoryPool* pool, int64_t* out_size = nullptr) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser));
+
+  int64_t size = block.partial->size() + block.completion->size() + block.whole->size();
+  RETURN_NOT_OK(parser->ReserveScalarStorage(size));
+
+  if (block.partial->size() || block.completion->size()) {
+    std::shared_ptr<Buffer> straddling;
+    if (!block.completion->size()) {
+      straddling = block.partial;
+    } else if (!block.partial->size()) {
+      straddling = block.completion;
+    } else {
+      ARROW_ASSIGN_OR_RAISE(straddling,
+                            ConcatenateBuffers({block.partial, block.completion}, pool));
+    }
+    RETURN_NOT_OK(parser->Parse(straddling));
+  }
+  if (block.whole->size()) {
+    RETURN_NOT_OK(parser->Parse(block.whole));
+  }
+
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  if (out_size) *out_size = size;
+
+  return parsed;
+}
+
+class ChunkingTransformer {
+ public:
+  explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker)
+      : chunker_(std::move(chunker)) {}
+
+  template <typename... Args>
+  static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... args) {
+    return [self = std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)](
+               std::shared_ptr<Buffer> buffer) { return (*self)(std::move(buffer)); };
+  }
+
+ private:
+  Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
+    if (!buffer_) {
+      if (ARROW_PREDICT_TRUE(!next_buffer)) {
+        partial_ = nullptr;
+        return TransformFinish();
+      }
+      partial_ = std::make_shared<Buffer>("");
+      buffer_ = std::move(next_buffer);
+      return TransformSkip();
+    }
+    DCHECK_NE(partial_, nullptr);
+
+    std::shared_ptr<Buffer> whole, completion, next_partial;
+    if (!next_buffer) {
+      // End of file reached => compute completion from penultimate block
+      RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &whole));
+    } else {
+      std::shared_ptr<Buffer> starts_with_whole;
+      // Get completion of partial from previous block.
+      RETURN_NOT_OK(chunker_->ProcessWithPartial(partial_, buffer_, &completion,
+                                                 &starts_with_whole));
+      // Get all whole objects entirely inside the current buffer
+      RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
+    }
+
+    buffer_ = std::move(next_buffer);
+    return TransformYield(ChunkedBlock{std::exchange(partial_, next_partial),
+                                       std::move(completion), std::move(whole),
+                                       index_++});
+  }
+
+  std::unique_ptr<Chunker> chunker_;
+  std::shared_ptr<Buffer> partial_;
+  std::shared_ptr<Buffer> buffer_;
+  int64_t index_ = 0;
+};
+
+template <typename... Args>
+Iterator<ChunkedBlock> MakeChunkingIterator(Iterator<std::shared_ptr<Buffer>> source,
+                                            Args&&... args) {
+  return MakeTransformedIterator(std::move(source),
+                                 ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
+
+template <typename... Args>
+AsyncGenerator<ChunkedBlock> MakeChunkingGenerator(
+    AsyncGenerator<std::shared_ptr<Buffer>> source, Args&&... args) {
+  return MakeTransformedGenerator(std::move(source),
+                                  ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
 
 class TableReaderImpl : public TableReader,
                         public std::enable_shared_from_this<TableReaderImpl> {
  public:
   TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
                   const ParseOptions& parse_options,
                   std::shared_ptr<TaskGroup> task_group)
-      : pool_(pool),
+      : decode_context_(parse_options, pool),
         read_options_(read_options),
-        parse_options_(parse_options),
-        chunker_(MakeChunker(parse_options_)),
         task_group_(std::move(task_group)) {}
 
   Status Init(std::shared_ptr<io::InputStream> input) {
     ARROW_ASSIGN_OR_RAISE(auto it,
                           io::MakeInputStreamIterator(input, read_options_.block_size));
     return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
-        .Value(&block_iterator_);
+        .Value(&buffer_iterator_);
   }
 
   Result<std::shared_ptr<Table>> Read() override {
-    RETURN_NOT_OK(MakeBuilder());
-
-    ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
-    if (block == nullptr) {
+    auto block_it = MakeChunkingIterator(std::move(buffer_iterator_),
+                                         MakeChunker(decode_context_.parse_options()));
+
+    bool did_read = false;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(auto block, block_it.Next());
+      if (IsIterationEnd(block)) break;
+      if (!did_read) {
+        did_read = true;
+        RETURN_NOT_OK(MakeBuilder());
+      }
+      task_group_->Append(
+          [self = shared_from_this(), block] { return self->ParseAndInsert(block); });
+    }
+    if (!did_read) {
       return Status::Invalid("Empty JSON file");
     }
 
-    auto self = shared_from_this();
-    auto empty = std::make_shared<Buffer>("");
+    std::shared_ptr<ChunkedArray> array;
+    RETURN_NOT_OK(builder_->Finish(&array));
+    return Table::FromChunkedStructArray(array);
+  }
 
-    int64_t block_index = 0;
-    std::shared_ptr<Buffer> partial = empty;
+ private:
+  Status MakeBuilder() {
+    return MakeChunkedArrayBuilder(task_group_, decode_context_.pool(),
+                                   decode_context_.promotion_graph(),
+                                   decode_context_.conversion_type(), &builder_);
+  }
 
-    while (block != nullptr) {
-      std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
+  Status ParseAndInsert(const ChunkedBlock& block) {
+    ARROW_ASSIGN_OR_RAISE(auto parsed, ParseBlock(block, decode_context_.parse_options(),
+                                                  decode_context_.pool()));
+    builder_->Insert(block.index, field("", parsed->type()), parsed);
+    return Status::OK();
+  }
 
-      ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
+  DecodeContext decode_context_;
+  ReadOptions read_options_;
+  std::shared_ptr<TaskGroup> task_group_;
+  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  std::shared_ptr<ChunkedArrayBuilder> builder_;
+};
 
-      if (next_block == nullptr) {
-        // End of file reached => compute completion from penultimate block
-        RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
-      } else {
-        std::shared_ptr<Buffer> starts_with_whole;
-        // Get completion of partial from previous block.
-        RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
-                                                   &starts_with_whole));
+// Callable object for parsing/converting individual JSON blocks. The class itself can be
+// called concurrently but reads from the `DecodeContext` aren't synchronized
+class DecodingOperator {
+ public:
+  explicit DecodingOperator(std::shared_ptr<const DecodeContext> context)
+      : context_(std::move(context)) {}
 
-        // Get all whole objects entirely inside the current buffer
-        RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
-      }
+  Result<DecodedBlock> operator()(const ChunkedBlock& block) const {
+    int64_t num_bytes;
+    ARROW_ASSIGN_OR_RAISE(auto unconverted, ParseBlock(block, context_->parse_options(),
+                                                       context_->pool(), &num_bytes));
 
-      // Launch parse task
-      task_group_->Append([self, partial, completion, whole, block_index] {
-        return self->ParseAndInsert(partial, completion, whole, block_index);
-      });
-      block_index++;
+    std::shared_ptr<ChunkedArrayBuilder> builder;
+    RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), context_->pool(),
+                                          context_->promotion_graph(),
+                                          context_->conversion_type(), &builder));
+    builder->Insert(0, field("", unconverted->type()), unconverted);
 
-      partial = next_partial;
-      block = next_block;
-    }
+    std::shared_ptr<ChunkedArray> chunked;
+    RETURN_NOT_OK(builder->Finish(&chunked));
+    ARROW_ASSIGN_OR_RAISE(auto batch, RecordBatch::FromStructArray(chunked->chunk(0)));
 
-    std::shared_ptr<ChunkedArray> array;
-    RETURN_NOT_OK(builder_->Finish(&array));
-    return Table::FromChunkedStructArray(array);
+    return DecodedBlock{std::move(batch), num_bytes};
   }
 
  private:
-  Status MakeBuilder() {
-    auto type = parse_options_.explicit_schema
-                    ? struct_(parse_options_.explicit_schema->fields())
-                    : struct_({});
+  std::shared_ptr<const DecodeContext> context_;
+};
 
-    auto promotion_graph =
-        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
-            ? GetPromotionGraph()
-            : nullptr;
+// TODO(benibus): Replace with `MakeApplyGenerator` from
+// github.com/apache/arrow/pull/14269 if/when it gets merged
+//
+// Reads from the source and spawns fan-out decoding tasks on the given executor
+AsyncGenerator<DecodedBlock> MakeDecodingGenerator(
+    AsyncGenerator<ChunkedBlock> source,
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder,
+    Executor* executor) {
+  struct State {
+    AsyncGenerator<ChunkedBlock> source;
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder;
+    Executor* executor;
+  } state{std::move(source), std::move(decoder), executor};
+
+  return [state = std::make_shared<State>(std::move(state))] {
+    auto options = CallbackOptions::Defaults();
+    options.executor = state->executor;
+    options.should_schedule = ShouldSchedule::Always;

Review Comment:
   Perhaps add a comment why (am I getting it right?)
   ```suggestion
       // Since the decode step is heavy we want to schedule it as
       // a separate task so as to maximize task distribution accross CPU cores
       options.should_schedule = ShouldSchedule::Always;
   ```



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  ASSERT_NE(reader->schema(), nullptr);
+  EXPECT_EQ(*reader->schema(), *test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 13);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_read(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, R"([{"b":true,"s":"foo"}])");
+
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);

Review Comment:
   `AssertSchemaEqual`



##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,435 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; }
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block,
+                                          const ParseOptions& parse_options,
+                                          MemoryPool* pool, int64_t* out_size = nullptr) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser));
+
+  int64_t size = block.partial->size() + block.completion->size() + block.whole->size();
+  RETURN_NOT_OK(parser->ReserveScalarStorage(size));
+
+  if (block.partial->size() || block.completion->size()) {
+    std::shared_ptr<Buffer> straddling;
+    if (!block.completion->size()) {
+      straddling = block.partial;
+    } else if (!block.partial->size()) {
+      straddling = block.completion;
+    } else {
+      ARROW_ASSIGN_OR_RAISE(straddling,
+                            ConcatenateBuffers({block.partial, block.completion}, pool));
+    }
+    RETURN_NOT_OK(parser->Parse(straddling));
+  }
+  if (block.whole->size()) {
+    RETURN_NOT_OK(parser->Parse(block.whole));
+  }
+
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  if (out_size) *out_size = size;
+
+  return parsed;
+}
+
+class ChunkingTransformer {
+ public:
+  explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker)
+      : chunker_(std::move(chunker)) {}
+
+  template <typename... Args>
+  static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... args) {
+    return [self = std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)](
+               std::shared_ptr<Buffer> buffer) { return (*self)(std::move(buffer)); };
+  }
+
+ private:
+  Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
+    if (!buffer_) {
+      if (ARROW_PREDICT_TRUE(!next_buffer)) {
+        partial_ = nullptr;
+        return TransformFinish();
+      }
+      partial_ = std::make_shared<Buffer>("");
+      buffer_ = std::move(next_buffer);
+      return TransformSkip();
+    }
+    DCHECK_NE(partial_, nullptr);
+
+    std::shared_ptr<Buffer> whole, completion, next_partial;
+    if (!next_buffer) {
+      // End of file reached => compute completion from penultimate block
+      RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &whole));
+    } else {
+      std::shared_ptr<Buffer> starts_with_whole;
+      // Get completion of partial from previous block.
+      RETURN_NOT_OK(chunker_->ProcessWithPartial(partial_, buffer_, &completion,
+                                                 &starts_with_whole));
+      // Get all whole objects entirely inside the current buffer
+      RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
+    }
+
+    buffer_ = std::move(next_buffer);
+    return TransformYield(ChunkedBlock{std::exchange(partial_, next_partial),
+                                       std::move(completion), std::move(whole),
+                                       index_++});
+  }
+
+  std::unique_ptr<Chunker> chunker_;
+  std::shared_ptr<Buffer> partial_;
+  std::shared_ptr<Buffer> buffer_;
+  int64_t index_ = 0;
+};
+
+template <typename... Args>
+Iterator<ChunkedBlock> MakeChunkingIterator(Iterator<std::shared_ptr<Buffer>> source,
+                                            Args&&... args) {
+  return MakeTransformedIterator(std::move(source),
+                                 ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
+
+template <typename... Args>
+AsyncGenerator<ChunkedBlock> MakeChunkingGenerator(
+    AsyncGenerator<std::shared_ptr<Buffer>> source, Args&&... args) {
+  return MakeTransformedGenerator(std::move(source),
+                                  ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
 
 class TableReaderImpl : public TableReader,
                         public std::enable_shared_from_this<TableReaderImpl> {
  public:
   TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
                   const ParseOptions& parse_options,
                   std::shared_ptr<TaskGroup> task_group)
-      : pool_(pool),
+      : decode_context_(parse_options, pool),
         read_options_(read_options),
-        parse_options_(parse_options),
-        chunker_(MakeChunker(parse_options_)),
         task_group_(std::move(task_group)) {}
 
   Status Init(std::shared_ptr<io::InputStream> input) {
     ARROW_ASSIGN_OR_RAISE(auto it,
                           io::MakeInputStreamIterator(input, read_options_.block_size));
     return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
-        .Value(&block_iterator_);
+        .Value(&buffer_iterator_);
   }
 
   Result<std::shared_ptr<Table>> Read() override {
-    RETURN_NOT_OK(MakeBuilder());
-
-    ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
-    if (block == nullptr) {
+    auto block_it = MakeChunkingIterator(std::move(buffer_iterator_),
+                                         MakeChunker(decode_context_.parse_options()));
+
+    bool did_read = false;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(auto block, block_it.Next());
+      if (IsIterationEnd(block)) break;
+      if (!did_read) {
+        did_read = true;
+        RETURN_NOT_OK(MakeBuilder());
+      }
+      task_group_->Append(
+          [self = shared_from_this(), block] { return self->ParseAndInsert(block); });
+    }
+    if (!did_read) {
       return Status::Invalid("Empty JSON file");
     }
 
-    auto self = shared_from_this();
-    auto empty = std::make_shared<Buffer>("");
+    std::shared_ptr<ChunkedArray> array;
+    RETURN_NOT_OK(builder_->Finish(&array));
+    return Table::FromChunkedStructArray(array);
+  }
 
-    int64_t block_index = 0;
-    std::shared_ptr<Buffer> partial = empty;
+ private:
+  Status MakeBuilder() {
+    return MakeChunkedArrayBuilder(task_group_, decode_context_.pool(),
+                                   decode_context_.promotion_graph(),
+                                   decode_context_.conversion_type(), &builder_);
+  }
 
-    while (block != nullptr) {
-      std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
+  Status ParseAndInsert(const ChunkedBlock& block) {
+    ARROW_ASSIGN_OR_RAISE(auto parsed, ParseBlock(block, decode_context_.parse_options(),
+                                                  decode_context_.pool()));
+    builder_->Insert(block.index, field("", parsed->type()), parsed);
+    return Status::OK();
+  }
 
-      ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
+  DecodeContext decode_context_;
+  ReadOptions read_options_;
+  std::shared_ptr<TaskGroup> task_group_;
+  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  std::shared_ptr<ChunkedArrayBuilder> builder_;
+};
 
-      if (next_block == nullptr) {
-        // End of file reached => compute completion from penultimate block
-        RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
-      } else {
-        std::shared_ptr<Buffer> starts_with_whole;
-        // Get completion of partial from previous block.
-        RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
-                                                   &starts_with_whole));
+// Callable object for parsing/converting individual JSON blocks. The class itself can be
+// called concurrently but reads from the `DecodeContext` aren't synchronized
+class DecodingOperator {
+ public:
+  explicit DecodingOperator(std::shared_ptr<const DecodeContext> context)
+      : context_(std::move(context)) {}
 
-        // Get all whole objects entirely inside the current buffer
-        RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
-      }
+  Result<DecodedBlock> operator()(const ChunkedBlock& block) const {
+    int64_t num_bytes;
+    ARROW_ASSIGN_OR_RAISE(auto unconverted, ParseBlock(block, context_->parse_options(),
+                                                       context_->pool(), &num_bytes));
 
-      // Launch parse task
-      task_group_->Append([self, partial, completion, whole, block_index] {
-        return self->ParseAndInsert(partial, completion, whole, block_index);
-      });
-      block_index++;
+    std::shared_ptr<ChunkedArrayBuilder> builder;
+    RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), context_->pool(),
+                                          context_->promotion_graph(),
+                                          context_->conversion_type(), &builder));
+    builder->Insert(0, field("", unconverted->type()), unconverted);
 
-      partial = next_partial;
-      block = next_block;
-    }
+    std::shared_ptr<ChunkedArray> chunked;
+    RETURN_NOT_OK(builder->Finish(&chunked));
+    ARROW_ASSIGN_OR_RAISE(auto batch, RecordBatch::FromStructArray(chunked->chunk(0)));
 
-    std::shared_ptr<ChunkedArray> array;
-    RETURN_NOT_OK(builder_->Finish(&array));
-    return Table::FromChunkedStructArray(array);
+    return DecodedBlock{std::move(batch), num_bytes};
   }
 
  private:
-  Status MakeBuilder() {
-    auto type = parse_options_.explicit_schema
-                    ? struct_(parse_options_.explicit_schema->fields())
-                    : struct_({});
+  std::shared_ptr<const DecodeContext> context_;
+};
 
-    auto promotion_graph =
-        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
-            ? GetPromotionGraph()
-            : nullptr;
+// TODO(benibus): Replace with `MakeApplyGenerator` from
+// github.com/apache/arrow/pull/14269 if/when it gets merged
+//
+// Reads from the source and spawns fan-out decoding tasks on the given executor
+AsyncGenerator<DecodedBlock> MakeDecodingGenerator(
+    AsyncGenerator<ChunkedBlock> source,
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder,
+    Executor* executor) {
+  struct State {
+    AsyncGenerator<ChunkedBlock> source;
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder;
+    Executor* executor;
+  } state{std::move(source), std::move(decoder), executor};
+
+  return [state = std::make_shared<State>(std::move(state))] {
+    auto options = CallbackOptions::Defaults();
+    options.executor = state->executor;
+    options.should_schedule = ShouldSchedule::Always;
+
+    return state->source().Then(
+        [state](const ChunkedBlock& block) -> Result<DecodedBlock> {
+          if (IsIterationEnd(block)) {
+            return IterationEnd<DecodedBlock>();
+          } else {
+            return state->decoder(block);
+          }
+        },
+        {}, options);
+  };
+}
 
-    return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
-  }
-
-  Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
-                        const std::shared_ptr<Buffer>& completion,
-                        const std::shared_ptr<Buffer>& whole, int64_t block_index) {
-    std::unique_ptr<BlockParser> parser;
-    RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
-    RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
-                                               whole->size()));
-
-    if (partial->size() != 0 || completion->size() != 0) {
-      std::shared_ptr<Buffer> straddling;
-      if (partial->size() == 0) {
-        straddling = completion;
-      } else if (completion->size() == 0) {
-        straddling = partial;
-      } else {
-        ARROW_ASSIGN_OR_RAISE(straddling,
-                              ConcatenateBuffers({partial, completion}, pool_));
-      }
-      RETURN_NOT_OK(parser->Parse(straddling));
+class StreamingReaderImpl : public StreamingReader {
+ public:
+  StreamingReaderImpl(DecodedBlock first_block, AsyncGenerator<DecodedBlock> source,
+                      const std::shared_ptr<DecodeContext>& context, int max_readahead)
+      : first_block_(std::move(first_block)),
+        schema_(first_block_->record_batch->schema()),
+        bytes_processed_(std::make_shared<std::atomic<int64_t>>(0)) {
+    // Set the final schema for future invocations of the source generator
+    context->SetStrictSchema(schema_);
+    if (max_readahead > 0) {
+      source = MakeReadaheadGenerator(std::move(source), max_readahead);
     }
+    generator_ = MakeMappedGenerator(
+        std::move(source), [counter = bytes_processed_](const DecodedBlock& out) {
+          counter->fetch_add(out.num_bytes);
+          return out.record_batch;
+        });
+  }
 
-    if (whole->size() != 0) {
-      RETURN_NOT_OK(parser->Parse(whole));
+  static Future<std::shared_ptr<StreamingReaderImpl>> MakeAsync(
+      AsyncGenerator<ChunkedBlock> chunking_gen, std::shared_ptr<DecodeContext> context,
+      Executor* cpu_executor, bool use_threads) {
+    auto source = MakeDecodingGenerator(std::move(chunking_gen),
+                                        DecodingOperator(context), cpu_executor);
+    const int max_readahead = use_threads ? cpu_executor->GetCapacity() : 0;
+    return FirstBlock(source).Then([source = std::move(source),
+                                    context = std::move(context),
+                                    max_readahead](const DecodedBlock& block) {
+      return std::make_shared<StreamingReaderImpl>(block, std::move(source), context,
+                                                   max_readahead);
+    });
+  }
+
+  [[nodiscard]] std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+    auto result = ReadNextAsync().result();
+    return std::move(result).Value(out);
+  }
+
+  Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
+    // On the first call, return the batch we used for initialization
+    if (ARROW_PREDICT_FALSE(first_block_)) {
+      bytes_processed_->fetch_add(first_block_->num_bytes);
+      auto batch = std::exchange(first_block_, std::nullopt)->record_batch;
+      return ToFuture(std::move(batch));
     }
+    return generator_();
+  }
 
-    std::shared_ptr<Array> parsed;
-    RETURN_NOT_OK(parser->Finish(&parsed));
-    builder_->Insert(block_index, field("", parsed->type()), parsed);
-    return Status::OK();
+  [[nodiscard]] int64_t bytes_read() const override { return bytes_processed_->load(); }
+
+ private:
+  static Future<DecodedBlock> FirstBlock(AsyncGenerator<DecodedBlock> gen) {
+    // Read from the stream until we get a non-empty record batch that we can use to
+    // declare the schema. Along the way, accumulate the bytes read so they can be
+    // recorded on the first `ReadNextAsync`
+    auto out = std::make_shared<DecodedBlock>();
+    DCHECK_EQ(out->num_bytes, 0);
+    auto loop_body = [gen = std::move(gen),
+                      out = std::move(out)]() -> Future<ControlFlow<DecodedBlock>> {
+      return gen().Then(
+          [out](const DecodedBlock& block) -> Result<ControlFlow<DecodedBlock>> {
+            if (IsIterationEnd(block)) {
+              return Status::Invalid("Empty JSON stream");
+            }
+            out->num_bytes += block.num_bytes;
+            if (block.record_batch->num_rows() == 0) {
+              return Continue();
+            }
+            out->record_batch = block.record_batch;
+            return Break(*out);
+          });
+    };
+    return Loop(std::move(loop_body));
   }
 
-  MemoryPool* pool_;
-  ReadOptions read_options_;
-  ParseOptions parse_options_;
-  std::unique_ptr<Chunker> chunker_;
-  std::shared_ptr<TaskGroup> task_group_;
-  Iterator<std::shared_ptr<Buffer>> block_iterator_;
-  std::shared_ptr<ChunkedArrayBuilder> builder_;
+  std::optional<DecodedBlock> first_block_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<std::atomic<int64_t>> bytes_processed_;
+  AsyncGenerator<std::shared_ptr<RecordBatch>> generator_;
 };
 
+template <typename T>
+Result<AsyncGenerator<T>> MakeReentrantGenerator(AsyncGenerator<T> source) {
+  struct State {
+    AsyncGenerator<T> source;
+    std::shared_ptr<ThreadPool> thread_pool;
+  } state{std::move(source), nullptr};
+  ARROW_ASSIGN_OR_RAISE(state.thread_pool, ThreadPool::Make(1));
+
+  return [state = std::make_shared<State>(std::move(state))]() -> Future<T> {
+    auto maybe_future =
+        state->thread_pool->Submit([state] { return state->source().result(); });
+    return DeferNotOk(std::move(maybe_future));
+  };
+}
+
+// Compose an async-reentrant `ChunkedBlock` generator using a sequentially-accessed
+// `InputStream`
+Result<AsyncGenerator<ChunkedBlock>> MakeChunkingGenerator(

Review Comment:
   This creates a second different function named `MakeChunkingGenerator`. Can we instead fold this one into `StreamingReader::MakeAsync`?



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  ASSERT_NE(reader->schema(), nullptr);
+  EXPECT_EQ(*reader->schema(), *test_schema);

Review Comment:
   `AssertSchemaEqual` would give better diagnostics on error.



##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,435 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; }
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block,
+                                          const ParseOptions& parse_options,
+                                          MemoryPool* pool, int64_t* out_size = nullptr) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser));
+
+  int64_t size = block.partial->size() + block.completion->size() + block.whole->size();
+  RETURN_NOT_OK(parser->ReserveScalarStorage(size));
+
+  if (block.partial->size() || block.completion->size()) {
+    std::shared_ptr<Buffer> straddling;
+    if (!block.completion->size()) {
+      straddling = block.partial;
+    } else if (!block.partial->size()) {
+      straddling = block.completion;
+    } else {
+      ARROW_ASSIGN_OR_RAISE(straddling,
+                            ConcatenateBuffers({block.partial, block.completion}, pool));
+    }
+    RETURN_NOT_OK(parser->Parse(straddling));
+  }
+  if (block.whole->size()) {
+    RETURN_NOT_OK(parser->Parse(block.whole));
+  }
+
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  if (out_size) *out_size = size;
+
+  return parsed;
+}
+
+class ChunkingTransformer {
+ public:
+  explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker)
+      : chunker_(std::move(chunker)) {}
+
+  template <typename... Args>
+  static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... args) {
+    return [self = std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)](
+               std::shared_ptr<Buffer> buffer) { return (*self)(std::move(buffer)); };
+  }
+
+ private:
+  Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
+    if (!buffer_) {
+      if (ARROW_PREDICT_TRUE(!next_buffer)) {
+        partial_ = nullptr;
+        return TransformFinish();
+      }
+      partial_ = std::make_shared<Buffer>("");
+      buffer_ = std::move(next_buffer);
+      return TransformSkip();
+    }
+    DCHECK_NE(partial_, nullptr);
+
+    std::shared_ptr<Buffer> whole, completion, next_partial;
+    if (!next_buffer) {
+      // End of file reached => compute completion from penultimate block
+      RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &whole));
+    } else {
+      std::shared_ptr<Buffer> starts_with_whole;
+      // Get completion of partial from previous block.
+      RETURN_NOT_OK(chunker_->ProcessWithPartial(partial_, buffer_, &completion,
+                                                 &starts_with_whole));
+      // Get all whole objects entirely inside the current buffer
+      RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
+    }
+
+    buffer_ = std::move(next_buffer);
+    return TransformYield(ChunkedBlock{std::exchange(partial_, next_partial),
+                                       std::move(completion), std::move(whole),
+                                       index_++});
+  }
+
+  std::unique_ptr<Chunker> chunker_;
+  std::shared_ptr<Buffer> partial_;
+  std::shared_ptr<Buffer> buffer_;
+  int64_t index_ = 0;
+};
+
+template <typename... Args>
+Iterator<ChunkedBlock> MakeChunkingIterator(Iterator<std::shared_ptr<Buffer>> source,
+                                            Args&&... args) {
+  return MakeTransformedIterator(std::move(source),
+                                 ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
+
+template <typename... Args>
+AsyncGenerator<ChunkedBlock> MakeChunkingGenerator(

Review Comment:
   Add a comment that the returned generator won't be async-reentrant (because of the non-trivial state mutations in ChunkingTransformer)?



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {

Review Comment:
   Note parsing errors should also be chunking errors if `newlines_in_values` is true....



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  ASSERT_NE(reader->schema(), nullptr);
+  EXPECT_EQ(*reader->schema(), *test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 13);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_read(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, R"([{"b":true,"s":"foo"}])");
+
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_read(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8()), field("t", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  read_options_.block_size = 48;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":null}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 106);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, InferredSchema) {
+  auto test_json = Join(
+      {
+          R"({"a": 0, "b": "foo"       })",
+          R"({"a": 1, "c": true        })",
+          R"({"a": 2, "d": "2022-01-01"})",
+      },
+      "\n", true);
+
+  std::shared_ptr<StreamingReader> reader;
+  std::shared_ptr<Schema> expected_schema;
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+
+  FieldVector fields = {field("a", int64()), field("b", utf8())};
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+  parse_options_.explicit_schema = nullptr;
+
+  // Schema derived from the first line
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 32;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": "foo"}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 28);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 1, "b": null}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 2, "b": null}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  // Schema derived from the first 2 lines
+  fields.push_back(field("c", boolean()));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 64;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null},
+    {"a": 1, "b":  null, "c": true}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 2, "b": null, "c": null}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  // Schema derived from all 3 lines
+  fields.push_back(field("d", timestamp(TimeUnit::SECOND)));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 96;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null, "d":  null},
+    {"a": 1, "b":  null, "c": true, "d":  null},
+    {"a": 2, "b":  null, "c": null, "d":  "2022-01-01"}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, AsyncReentrancy) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches + 1);
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  for (auto& future : futures) {
+    future = reader->ReadNextAsync();
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  EXPECT_EQ(reader->bytes_read(), expected.json_size);
+  ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(std::move(results)));
+  EXPECT_EQ(batches.back(), nullptr);
+  batches.pop_back();
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  ASSERT_TABLES_EQUAL(*expected.table, *table);
+}
+
+TEST_P(StreamingReaderTest, FuturesOutliveReader) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  auto stream = MakeTestStream(expected.json, kIoLatency);
+  std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches);
+  std::weak_ptr<StreamingReader> weak_reader;
+  {
+    ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(stream));
+    weak_reader = reader;
+    EXPECT_EQ(reader->bytes_read(), 0);
+    for (auto& future : futures) {
+      future = reader->ReadNextAsync();
+    }
+  }
+
+  auto all_future = All(std::move(futures));
+  AssertNotFinished(all_future);
+  EXPECT_EQ(weak_reader.use_count(), 0);

Review Comment:
   Not sure this test is useful. The interesting thing we test here is that the futures don't crash when we wait on them after the local reader pointer is gone.



##########
cpp/src/arrow/json/reader.cc:
##########
@@ -42,132 +42,435 @@ namespace arrow {
 using std::string_view;
 
 using internal::checked_cast;
+using internal::Executor;
 using internal::GetCpuThreadPool;
 using internal::TaskGroup;
 using internal::ThreadPool;
 
 namespace json {
+namespace {
+
+struct ChunkedBlock {
+  std::shared_ptr<Buffer> partial;
+  std::shared_ptr<Buffer> completion;
+  std::shared_ptr<Buffer> whole;
+  int64_t index = -1;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  int64_t num_bytes = 0;
+};
+
+}  // namespace
+}  // namespace json
+
+template <>
+struct IterationTraits<json::ChunkedBlock> {
+  static json::ChunkedBlock End() { return json::ChunkedBlock{}; }
+  static bool IsEnd(const json::ChunkedBlock& val) { return val.index < 0; }
+};
+
+template <>
+struct IterationTraits<json::DecodedBlock> {
+  static json::DecodedBlock End() { return json::DecodedBlock{}; }
+  static bool IsEnd(const json::DecodedBlock& val) { return !val.record_batch; }
+};
+
+namespace json {
+namespace {
+
+// Holds related parameters for parsing and type conversion
+class DecodeContext {
+ public:
+  explicit DecodeContext(MemoryPool* pool)
+      : DecodeContext(ParseOptions::Defaults(), pool) {}
+  explicit DecodeContext(ParseOptions options = ParseOptions::Defaults(),
+                         MemoryPool* pool = default_memory_pool())
+      : pool_(pool) {
+    SetParseOptions(std::move(options));
+  }
+
+  void SetParseOptions(ParseOptions options) {
+    parse_options_ = std::move(options);
+    if (parse_options_.explicit_schema) {
+      conversion_type_ = struct_(parse_options_.explicit_schema->fields());
+    } else {
+      parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+      conversion_type_ = struct_({});
+    }
+    promotion_graph_ =
+        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
+            ? GetPromotionGraph()
+            : nullptr;
+  }
+
+  void SetSchema(std::shared_ptr<Schema> explicit_schema,
+                 UnexpectedFieldBehavior unexpected_field_behavior) {
+    parse_options_.explicit_schema = std::move(explicit_schema);
+    parse_options_.unexpected_field_behavior = unexpected_field_behavior;
+    SetParseOptions(std::move(parse_options_));
+  }
+  void SetSchema(std::shared_ptr<Schema> explicit_schema) {
+    SetSchema(std::move(explicit_schema), parse_options_.unexpected_field_behavior);
+  }
+  // Set the schema but ensure unexpected fields won't be accepted
+  void SetStrictSchema(std::shared_ptr<Schema> explicit_schema) {
+    auto unexpected_field_behavior = parse_options_.unexpected_field_behavior;
+    if (unexpected_field_behavior == UnexpectedFieldBehavior::InferType) {
+      unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+    }
+    SetSchema(std::move(explicit_schema), unexpected_field_behavior);
+  }
+
+  [[nodiscard]] MemoryPool* pool() const { return pool_; }
+  [[nodiscard]] const ParseOptions& parse_options() const { return parse_options_; }
+  [[nodiscard]] const PromotionGraph* promotion_graph() const { return promotion_graph_; }
+  [[nodiscard]] const std::shared_ptr<DataType>& conversion_type() const {
+    return conversion_type_;
+  }
+
+ private:
+  ParseOptions parse_options_;
+  std::shared_ptr<DataType> conversion_type_;
+  const PromotionGraph* promotion_graph_;
+  MemoryPool* pool_;
+};
+
+Result<std::shared_ptr<Array>> ParseBlock(const ChunkedBlock& block,
+                                          const ParseOptions& parse_options,
+                                          MemoryPool* pool, int64_t* out_size = nullptr) {
+  std::unique_ptr<BlockParser> parser;
+  RETURN_NOT_OK(BlockParser::Make(pool, parse_options, &parser));
+
+  int64_t size = block.partial->size() + block.completion->size() + block.whole->size();
+  RETURN_NOT_OK(parser->ReserveScalarStorage(size));
+
+  if (block.partial->size() || block.completion->size()) {
+    std::shared_ptr<Buffer> straddling;
+    if (!block.completion->size()) {
+      straddling = block.partial;
+    } else if (!block.partial->size()) {
+      straddling = block.completion;
+    } else {
+      ARROW_ASSIGN_OR_RAISE(straddling,
+                            ConcatenateBuffers({block.partial, block.completion}, pool));
+    }
+    RETURN_NOT_OK(parser->Parse(straddling));
+  }
+  if (block.whole->size()) {
+    RETURN_NOT_OK(parser->Parse(block.whole));
+  }
+
+  std::shared_ptr<Array> parsed;
+  RETURN_NOT_OK(parser->Finish(&parsed));
+
+  if (out_size) *out_size = size;
+
+  return parsed;
+}
+
+class ChunkingTransformer {
+ public:
+  explicit ChunkingTransformer(std::unique_ptr<Chunker> chunker)
+      : chunker_(std::move(chunker)) {}
+
+  template <typename... Args>
+  static Transformer<std::shared_ptr<Buffer>, ChunkedBlock> Make(Args&&... args) {
+    return [self = std::make_shared<ChunkingTransformer>(std::forward<Args>(args)...)](
+               std::shared_ptr<Buffer> buffer) { return (*self)(std::move(buffer)); };
+  }
+
+ private:
+  Result<TransformFlow<ChunkedBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
+    if (!buffer_) {
+      if (ARROW_PREDICT_TRUE(!next_buffer)) {
+        partial_ = nullptr;
+        return TransformFinish();
+      }
+      partial_ = std::make_shared<Buffer>("");
+      buffer_ = std::move(next_buffer);
+      return TransformSkip();
+    }
+    DCHECK_NE(partial_, nullptr);
+
+    std::shared_ptr<Buffer> whole, completion, next_partial;
+    if (!next_buffer) {
+      // End of file reached => compute completion from penultimate block
+      RETURN_NOT_OK(chunker_->ProcessFinal(partial_, buffer_, &completion, &whole));
+    } else {
+      std::shared_ptr<Buffer> starts_with_whole;
+      // Get completion of partial from previous block.
+      RETURN_NOT_OK(chunker_->ProcessWithPartial(partial_, buffer_, &completion,
+                                                 &starts_with_whole));
+      // Get all whole objects entirely inside the current buffer
+      RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
+    }
+
+    buffer_ = std::move(next_buffer);
+    return TransformYield(ChunkedBlock{std::exchange(partial_, next_partial),
+                                       std::move(completion), std::move(whole),
+                                       index_++});
+  }
+
+  std::unique_ptr<Chunker> chunker_;
+  std::shared_ptr<Buffer> partial_;
+  std::shared_ptr<Buffer> buffer_;
+  int64_t index_ = 0;
+};
+
+template <typename... Args>
+Iterator<ChunkedBlock> MakeChunkingIterator(Iterator<std::shared_ptr<Buffer>> source,
+                                            Args&&... args) {
+  return MakeTransformedIterator(std::move(source),
+                                 ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
+
+template <typename... Args>
+AsyncGenerator<ChunkedBlock> MakeChunkingGenerator(
+    AsyncGenerator<std::shared_ptr<Buffer>> source, Args&&... args) {
+  return MakeTransformedGenerator(std::move(source),
+                                  ChunkingTransformer::Make(std::forward<Args>(args)...));
+}
 
 class TableReaderImpl : public TableReader,
                         public std::enable_shared_from_this<TableReaderImpl> {
  public:
   TableReaderImpl(MemoryPool* pool, const ReadOptions& read_options,
                   const ParseOptions& parse_options,
                   std::shared_ptr<TaskGroup> task_group)
-      : pool_(pool),
+      : decode_context_(parse_options, pool),
         read_options_(read_options),
-        parse_options_(parse_options),
-        chunker_(MakeChunker(parse_options_)),
         task_group_(std::move(task_group)) {}
 
   Status Init(std::shared_ptr<io::InputStream> input) {
     ARROW_ASSIGN_OR_RAISE(auto it,
                           io::MakeInputStreamIterator(input, read_options_.block_size));
     return MakeReadaheadIterator(std::move(it), task_group_->parallelism())
-        .Value(&block_iterator_);
+        .Value(&buffer_iterator_);
   }
 
   Result<std::shared_ptr<Table>> Read() override {
-    RETURN_NOT_OK(MakeBuilder());
-
-    ARROW_ASSIGN_OR_RAISE(auto block, block_iterator_.Next());
-    if (block == nullptr) {
+    auto block_it = MakeChunkingIterator(std::move(buffer_iterator_),
+                                         MakeChunker(decode_context_.parse_options()));
+
+    bool did_read = false;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(auto block, block_it.Next());
+      if (IsIterationEnd(block)) break;
+      if (!did_read) {
+        did_read = true;
+        RETURN_NOT_OK(MakeBuilder());
+      }
+      task_group_->Append(
+          [self = shared_from_this(), block] { return self->ParseAndInsert(block); });
+    }
+    if (!did_read) {
       return Status::Invalid("Empty JSON file");
     }
 
-    auto self = shared_from_this();
-    auto empty = std::make_shared<Buffer>("");
+    std::shared_ptr<ChunkedArray> array;
+    RETURN_NOT_OK(builder_->Finish(&array));
+    return Table::FromChunkedStructArray(array);
+  }
 
-    int64_t block_index = 0;
-    std::shared_ptr<Buffer> partial = empty;
+ private:
+  Status MakeBuilder() {
+    return MakeChunkedArrayBuilder(task_group_, decode_context_.pool(),
+                                   decode_context_.promotion_graph(),
+                                   decode_context_.conversion_type(), &builder_);
+  }
 
-    while (block != nullptr) {
-      std::shared_ptr<Buffer> next_block, whole, completion, next_partial;
+  Status ParseAndInsert(const ChunkedBlock& block) {
+    ARROW_ASSIGN_OR_RAISE(auto parsed, ParseBlock(block, decode_context_.parse_options(),
+                                                  decode_context_.pool()));
+    builder_->Insert(block.index, field("", parsed->type()), parsed);
+    return Status::OK();
+  }
 
-      ARROW_ASSIGN_OR_RAISE(next_block, block_iterator_.Next());
+  DecodeContext decode_context_;
+  ReadOptions read_options_;
+  std::shared_ptr<TaskGroup> task_group_;
+  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  std::shared_ptr<ChunkedArrayBuilder> builder_;
+};
 
-      if (next_block == nullptr) {
-        // End of file reached => compute completion from penultimate block
-        RETURN_NOT_OK(chunker_->ProcessFinal(partial, block, &completion, &whole));
-      } else {
-        std::shared_ptr<Buffer> starts_with_whole;
-        // Get completion of partial from previous block.
-        RETURN_NOT_OK(chunker_->ProcessWithPartial(partial, block, &completion,
-                                                   &starts_with_whole));
+// Callable object for parsing/converting individual JSON blocks. The class itself can be
+// called concurrently but reads from the `DecodeContext` aren't synchronized
+class DecodingOperator {
+ public:
+  explicit DecodingOperator(std::shared_ptr<const DecodeContext> context)
+      : context_(std::move(context)) {}
 
-        // Get all whole objects entirely inside the current buffer
-        RETURN_NOT_OK(chunker_->Process(starts_with_whole, &whole, &next_partial));
-      }
+  Result<DecodedBlock> operator()(const ChunkedBlock& block) const {
+    int64_t num_bytes;
+    ARROW_ASSIGN_OR_RAISE(auto unconverted, ParseBlock(block, context_->parse_options(),
+                                                       context_->pool(), &num_bytes));
 
-      // Launch parse task
-      task_group_->Append([self, partial, completion, whole, block_index] {
-        return self->ParseAndInsert(partial, completion, whole, block_index);
-      });
-      block_index++;
+    std::shared_ptr<ChunkedArrayBuilder> builder;
+    RETURN_NOT_OK(MakeChunkedArrayBuilder(TaskGroup::MakeSerial(), context_->pool(),
+                                          context_->promotion_graph(),
+                                          context_->conversion_type(), &builder));
+    builder->Insert(0, field("", unconverted->type()), unconverted);
 
-      partial = next_partial;
-      block = next_block;
-    }
+    std::shared_ptr<ChunkedArray> chunked;
+    RETURN_NOT_OK(builder->Finish(&chunked));
+    ARROW_ASSIGN_OR_RAISE(auto batch, RecordBatch::FromStructArray(chunked->chunk(0)));
 
-    std::shared_ptr<ChunkedArray> array;
-    RETURN_NOT_OK(builder_->Finish(&array));
-    return Table::FromChunkedStructArray(array);
+    return DecodedBlock{std::move(batch), num_bytes};
   }
 
  private:
-  Status MakeBuilder() {
-    auto type = parse_options_.explicit_schema
-                    ? struct_(parse_options_.explicit_schema->fields())
-                    : struct_({});
+  std::shared_ptr<const DecodeContext> context_;
+};
 
-    auto promotion_graph =
-        parse_options_.unexpected_field_behavior == UnexpectedFieldBehavior::InferType
-            ? GetPromotionGraph()
-            : nullptr;
+// TODO(benibus): Replace with `MakeApplyGenerator` from
+// github.com/apache/arrow/pull/14269 if/when it gets merged
+//
+// Reads from the source and spawns fan-out decoding tasks on the given executor
+AsyncGenerator<DecodedBlock> MakeDecodingGenerator(
+    AsyncGenerator<ChunkedBlock> source,
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder,
+    Executor* executor) {
+  struct State {
+    AsyncGenerator<ChunkedBlock> source;
+    std::function<Result<DecodedBlock>(const ChunkedBlock&)> decoder;
+    Executor* executor;
+  } state{std::move(source), std::move(decoder), executor};
+
+  return [state = std::make_shared<State>(std::move(state))] {
+    auto options = CallbackOptions::Defaults();
+    options.executor = state->executor;
+    options.should_schedule = ShouldSchedule::Always;
+
+    return state->source().Then(
+        [state](const ChunkedBlock& block) -> Result<DecodedBlock> {
+          if (IsIterationEnd(block)) {
+            return IterationEnd<DecodedBlock>();
+          } else {
+            return state->decoder(block);
+          }
+        },
+        {}, options);
+  };
+}
 
-    return MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph, type, &builder_);
-  }
-
-  Status ParseAndInsert(const std::shared_ptr<Buffer>& partial,
-                        const std::shared_ptr<Buffer>& completion,
-                        const std::shared_ptr<Buffer>& whole, int64_t block_index) {
-    std::unique_ptr<BlockParser> parser;
-    RETURN_NOT_OK(BlockParser::Make(pool_, parse_options_, &parser));
-    RETURN_NOT_OK(parser->ReserveScalarStorage(partial->size() + completion->size() +
-                                               whole->size()));
-
-    if (partial->size() != 0 || completion->size() != 0) {
-      std::shared_ptr<Buffer> straddling;
-      if (partial->size() == 0) {
-        straddling = completion;
-      } else if (completion->size() == 0) {
-        straddling = partial;
-      } else {
-        ARROW_ASSIGN_OR_RAISE(straddling,
-                              ConcatenateBuffers({partial, completion}, pool_));
-      }
-      RETURN_NOT_OK(parser->Parse(straddling));
+class StreamingReaderImpl : public StreamingReader {
+ public:
+  StreamingReaderImpl(DecodedBlock first_block, AsyncGenerator<DecodedBlock> source,
+                      const std::shared_ptr<DecodeContext>& context, int max_readahead)
+      : first_block_(std::move(first_block)),
+        schema_(first_block_->record_batch->schema()),
+        bytes_processed_(std::make_shared<std::atomic<int64_t>>(0)) {
+    // Set the final schema for future invocations of the source generator
+    context->SetStrictSchema(schema_);
+    if (max_readahead > 0) {
+      source = MakeReadaheadGenerator(std::move(source), max_readahead);
     }
+    generator_ = MakeMappedGenerator(
+        std::move(source), [counter = bytes_processed_](const DecodedBlock& out) {
+          counter->fetch_add(out.num_bytes);
+          return out.record_batch;
+        });
+  }
 
-    if (whole->size() != 0) {
-      RETURN_NOT_OK(parser->Parse(whole));
+  static Future<std::shared_ptr<StreamingReaderImpl>> MakeAsync(
+      AsyncGenerator<ChunkedBlock> chunking_gen, std::shared_ptr<DecodeContext> context,
+      Executor* cpu_executor, bool use_threads) {
+    auto source = MakeDecodingGenerator(std::move(chunking_gen),
+                                        DecodingOperator(context), cpu_executor);
+    const int max_readahead = use_threads ? cpu_executor->GetCapacity() : 0;
+    return FirstBlock(source).Then([source = std::move(source),
+                                    context = std::move(context),
+                                    max_readahead](const DecodedBlock& block) {
+      return std::make_shared<StreamingReaderImpl>(block, std::move(source), context,
+                                                   max_readahead);
+    });
+  }
+
+  [[nodiscard]] std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+    auto result = ReadNextAsync().result();
+    return std::move(result).Value(out);
+  }
+
+  Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
+    // On the first call, return the batch we used for initialization
+    if (ARROW_PREDICT_FALSE(first_block_)) {
+      bytes_processed_->fetch_add(first_block_->num_bytes);
+      auto batch = std::exchange(first_block_, std::nullopt)->record_batch;
+      return ToFuture(std::move(batch));
     }
+    return generator_();
+  }
 
-    std::shared_ptr<Array> parsed;
-    RETURN_NOT_OK(parser->Finish(&parsed));
-    builder_->Insert(block_index, field("", parsed->type()), parsed);
-    return Status::OK();
+  [[nodiscard]] int64_t bytes_read() const override { return bytes_processed_->load(); }
+
+ private:
+  static Future<DecodedBlock> FirstBlock(AsyncGenerator<DecodedBlock> gen) {
+    // Read from the stream until we get a non-empty record batch that we can use to
+    // declare the schema. Along the way, accumulate the bytes read so they can be
+    // recorded on the first `ReadNextAsync`
+    auto out = std::make_shared<DecodedBlock>();
+    DCHECK_EQ(out->num_bytes, 0);
+    auto loop_body = [gen = std::move(gen),
+                      out = std::move(out)]() -> Future<ControlFlow<DecodedBlock>> {
+      return gen().Then(
+          [out](const DecodedBlock& block) -> Result<ControlFlow<DecodedBlock>> {
+            if (IsIterationEnd(block)) {
+              return Status::Invalid("Empty JSON stream");
+            }
+            out->num_bytes += block.num_bytes;
+            if (block.record_batch->num_rows() == 0) {
+              return Continue();
+            }
+            out->record_batch = block.record_batch;
+            return Break(*out);
+          });
+    };
+    return Loop(std::move(loop_body));
   }
 
-  MemoryPool* pool_;
-  ReadOptions read_options_;
-  ParseOptions parse_options_;
-  std::unique_ptr<Chunker> chunker_;
-  std::shared_ptr<TaskGroup> task_group_;
-  Iterator<std::shared_ptr<Buffer>> block_iterator_;
-  std::shared_ptr<ChunkedArrayBuilder> builder_;
+  std::optional<DecodedBlock> first_block_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<std::atomic<int64_t>> bytes_processed_;
+  AsyncGenerator<std::shared_ptr<RecordBatch>> generator_;
 };
 
+template <typename T>
+Result<AsyncGenerator<T>> MakeReentrantGenerator(AsyncGenerator<T> source) {
+  struct State {
+    AsyncGenerator<T> source;
+    std::shared_ptr<ThreadPool> thread_pool;
+  } state{std::move(source), nullptr};
+  ARROW_ASSIGN_OR_RAISE(state.thread_pool, ThreadPool::Make(1));
+
+  return [state = std::make_shared<State>(std::move(state))]() -> Future<T> {
+    auto maybe_future =
+        state->thread_pool->Submit([state] { return state->source().result(); });
+    return DeferNotOk(std::move(maybe_future));
+  };
+}
+
+// Compose an async-reentrant `ChunkedBlock` generator using a sequentially-accessed
+// `InputStream`
+Result<AsyncGenerator<ChunkedBlock>> MakeChunkingGenerator(
+    std::shared_ptr<io::InputStream> stream, int32_t block_size,
+    std::unique_ptr<Chunker> chunker, Executor* io_executor, Executor* cpu_executor) {
+  ARROW_ASSIGN_OR_RAISE(auto source_it,
+                        io::MakeInputStreamIterator(std::move(stream), block_size));
+  ARROW_ASSIGN_OR_RAISE(auto source_gen,
+                        MakeBackgroundGenerator(std::move(source_it), io_executor));
+  source_gen = MakeTransferredGenerator(std::move(source_gen), cpu_executor);
+
+  auto gen = MakeChunkingGenerator(std::move(source_gen), std::move(chunker));
+  ARROW_ASSIGN_OR_RAISE(gen, MakeReentrantGenerator(std::move(gen)));
+  return gen;

Review Comment:
   Ok, so AFAIU the issue is:
   1) chunking is inherently serial
   2) chunking is CPU-expensive (because we have to lex the JSON stream)
   which is why the easy solution is to transfer it to a 1-thread executor.
   
   Can you add a comment to explain this?
   Also we can probably simplify the code and get rid of `MakeReentrantGenerator` (untested):
   ```suggestion
   
     ARROW_ASSIGN_OR_RAISE(auto chunking_executor, ThreadPool::Make(1));
     auto source_gen = MakeTransferredGenerator(std::move(source_gen), chunking_executor);
     auto gen = MakeChunkingGenerator(std::move(source_gen), std::move(chunker));
     return MakeTransferredGenerator(std::move(gen), cpu_executor);
   ```
   



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  ASSERT_NE(reader->schema(), nullptr);
+  EXPECT_EQ(*reader->schema(), *test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 13);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_read(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, R"([{"b":true,"s":"foo"}])");
+
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_read(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8()), field("t", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  read_options_.block_size = 48;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":null}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 106);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, InferredSchema) {
+  auto test_json = Join(
+      {
+          R"({"a": 0, "b": "foo"       })",
+          R"({"a": 1, "c": true        })",
+          R"({"a": 2, "d": "2022-01-01"})",
+      },
+      "\n", true);
+
+  std::shared_ptr<StreamingReader> reader;
+  std::shared_ptr<Schema> expected_schema;
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+
+  FieldVector fields = {field("a", int64()), field("b", utf8())};
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+  parse_options_.explicit_schema = nullptr;
+
+  // Schema derived from the first line
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 32;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": "foo"}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 28);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 1, "b": null}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 2, "b": null}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  // Schema derived from the first 2 lines
+  fields.push_back(field("c", boolean()));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 64;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null},
+    {"a": 1, "b":  null, "c": true}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 2, "b": null, "c": null}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  // Schema derived from all 3 lines
+  fields.push_back(field("d", timestamp(TimeUnit::SECOND)));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 96;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null, "d":  null},
+    {"a": 1, "b":  null, "c": true, "d":  null},
+    {"a": 2, "b":  null, "c": null, "d":  "2022-01-01"}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, AsyncReentrancy) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches + 1);
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  for (auto& future : futures) {
+    future = reader->ReadNextAsync();
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  EXPECT_EQ(reader->bytes_read(), expected.json_size);
+  ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(std::move(results)));
+  EXPECT_EQ(batches.back(), nullptr);
+  batches.pop_back();
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  ASSERT_TABLES_EQUAL(*expected.table, *table);
+}
+
+TEST_P(StreamingReaderTest, FuturesOutliveReader) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  auto stream = MakeTestStream(expected.json, kIoLatency);
+  std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches);
+  std::weak_ptr<StreamingReader> weak_reader;
+  {
+    ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(stream));
+    weak_reader = reader;
+    EXPECT_EQ(reader->bytes_read(), 0);
+    for (auto& future : futures) {
+      future = reader->ReadNextAsync();
+    }
+  }
+
+  auto all_future = All(std::move(futures));
+  AssertNotFinished(all_future);
+  EXPECT_EQ(weak_reader.use_count(), 0);

Review Comment:
   I expect the reason this _might_ fail is that the `Then` callback in `StreamingReader::MakeAsync` is executed on a separate thread, which immediately unblocks execution in the main thread while the future still remains alive a slight bit after the callback end. Only a hypothesis, though.
   
   If you want to keep this, you might instead write:
   ```c++
     BusyWait(/*seconds=*/ 1.0, [&]() { return weak_reader.expired(); });
     ASSERT_TRUE(weak_reader.expired());
     auto all_future = All(std::move(futures));
     AssertNotFinished(all_future);
   ```
   



##########
cpp/src/arrow/json/reader_test.cc:
##########
@@ -305,5 +309,530 @@ TEST(ReaderTest, ListArrayWithFewValues) {
   AssertTablesEqual(*actual_table, *expected_table);
 }
 
+class StreamingReaderTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override { read_options_.use_threads = GetParam(); }
+
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str) {
+    auto buffer = std::make_shared<Buffer>(str);
+    return std::make_shared<io::BufferReader>(std::move(buffer));
+  }
+  // Stream with simulated latency
+  static std::shared_ptr<io::InputStream> MakeTestStream(const std::string& str,
+                                                         double latency) {
+    return std::make_shared<io::SlowInputStream>(MakeTestStream(str), latency);
+  }
+
+  Result<std::shared_ptr<StreamingReader>> MakeReader(
+      std::shared_ptr<io::InputStream> stream) {
+    return StreamingReader::Make(std::move(stream), io_context_, executor_, read_options_,
+                                 parse_options_);
+  }
+  template <typename... Args>
+  Result<std::shared_ptr<StreamingReader>> MakeReader(Args&&... args) {
+    return MakeReader(MakeTestStream(std::forward<Args>(args)...));
+  }
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> MakeGenerator(
+      std::shared_ptr<StreamingReader> reader) {
+    return [reader = std::move(reader)] { return reader->ReadNextAsync(); };
+  }
+  template <typename... Args>
+  Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> MakeGenerator(Args&&... args) {
+    ARROW_ASSIGN_OR_RAISE(auto reader, MakeReader(std::forward<Args>(args)...));
+    return MakeGenerator(std::move(reader));
+  }
+
+  static void AssertReadNext(const std::shared_ptr<StreamingReader>& reader,
+                             std::shared_ptr<RecordBatch>* out) {
+    ASSERT_OK(reader->ReadNext(out));
+    ASSERT_FALSE(IsIterationEnd(*out));
+  }
+  static void AssertReadEnd(const std::shared_ptr<StreamingReader>& reader) {
+    std::shared_ptr<RecordBatch> out;
+    ASSERT_OK(reader->ReadNext(&out));
+    ASSERT_TRUE(IsIterationEnd(out));
+  }
+
+  struct TestCase {
+    std::string json;
+    int json_size;
+    int block_size;
+    int num_rows;
+    int num_batches;
+    std::shared_ptr<Schema> schema;
+    RecordBatchVector batches;
+    std::shared_ptr<Table> table;
+  };
+
+  // Creates a test case from valid JSON objects with a human-readable index field and a
+  // struct field of random data. `block_size_multiplier` is applied to the largest
+  // generated row length to determine the target block_size. i.e - higher multiplier
+  // means fewer batches
+  static TestCase GenerateTestCase(int num_rows, double block_size_multiplier = 3.0) {
+    FieldVector data_fields = {field("s", utf8()), field("f", float64()),
+                               field("b", boolean())};
+    FieldVector fields = {field("i", int64()), field("d", struct_({data_fields}))};
+    TestCase out;
+    out.schema = schema(fields);
+    out.num_rows = num_rows;
+
+    constexpr int kSeed = 0x432432;
+    std::default_random_engine engine(kSeed);
+    std::vector<std::string> rows(num_rows);
+    size_t max_row_size = 1;
+
+    auto options = GenerateOptions::Defaults();
+    options.null_probability = 0;
+    for (int i = 0; i < num_rows; ++i) {
+      StringBuffer string_buffer;
+      Writer writer(string_buffer);
+      ABORT_NOT_OK(Generate(data_fields, engine, &writer, options));
+      std::string json = string_buffer.GetString();
+      rows[i] = Join({"{\"i\":", std::to_string(i), ",\"d\":", json, "}\n"});
+      max_row_size = std::max(max_row_size, rows[i].size());
+    }
+
+    auto block_size = static_cast<size_t>(max_row_size * block_size_multiplier);
+    // Deduce the expected record batches from the target block size.
+    std::vector<std::string> batch_rows;
+    size_t pos = 0;
+    for (const auto& row : rows) {
+      pos += row.size();
+      if (pos > block_size) {
+        out.batches.push_back(
+            RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+        batch_rows.clear();
+        pos -= block_size;
+      }
+      batch_rows.push_back(row);
+      out.json += row;
+    }
+    if (!batch_rows.empty()) {
+      out.batches.push_back(
+          RecordBatchFromJSON(out.schema, Join({"[", Join(batch_rows, ","), "]"})));
+    }
+
+    out.json_size = static_cast<int>(out.json.size());
+    out.block_size = static_cast<int>(block_size);
+    out.num_batches = static_cast<int>(out.batches.size());
+    out.table = *Table::FromRecordBatches(out.batches);
+
+    return out;
+  }
+
+  static std::string Join(const std::vector<std::string>& strings,
+                          const std::string& delim = "", bool trailing_delim = false) {
+    std::string out;
+    for (size_t i = 0; i < strings.size();) {
+      out += strings[i++];
+      if (i != strings.size() || trailing_delim) {
+        out += delim;
+      }
+    }
+    return out;
+  }
+
+  internal::Executor* executor_ = internal::GetCpuThreadPool();
+  ParseOptions parse_options_ = ParseOptions::Defaults();
+  ReadOptions read_options_ = ReadOptions::Defaults();
+  io::IOContext io_context_ = io::default_io_context();
+};
+
+INSTANTIATE_TEST_SUITE_P(StreamingReaderTest, StreamingReaderTest,
+                         ::testing::Values(false, true));
+
+TEST_P(StreamingReaderTest, ErrorOnEmptyStream) {
+  ASSERT_RAISES(Invalid, MakeReader(""));
+  std::string data(100, '\n');
+  for (auto block_size : {25, 49, 50, 100, 200}) {
+    read_options_.block_size = block_size;
+    ASSERT_RAISES(Invalid, MakeReader(data));
+  }
+}
+
+TEST_P(StreamingReaderTest, PropagateChunkingErrors) {
+  constexpr double kIoLatency = 1e-3;
+
+  auto test_schema = schema({field("i", int64())});
+  auto bad_first_chunk = Join(
+      {
+          R"({"i": 0            })",
+          R"({"i": 1})",
+      },
+      "\n");
+  auto bad_middle_chunk = Join(
+      {
+          R"({"i": 0})",
+          R"({"i":    1})",
+          R"({"i": 2})",
+      },
+      "\n");
+
+  read_options_.block_size = 10;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_chunk));
+
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_chunk, kIoLatency));
+
+  std::shared_ptr<RecordBatch> batch;
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 9);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, "[{\"i\":0}]"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 9);
+  AssertReadEnd(reader);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 9);
+}
+
+TEST_P(StreamingReaderTest, PropagateParsingErrors) {
+  auto test_schema = schema({field("n", int64())});
+  auto bad_first_block = Join(
+      {
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_first_block_after_empty = Join(
+      {
+          R"(            )",
+          R"({"n": })",
+          R"({"n": 10000})",
+      },
+      "\n");
+  auto bad_middle_block = Join(
+      {
+          R"({"n": 10000})",
+          R"({"n": 200 0})",
+          R"({"n": 30000})",
+      },
+      "\n");
+
+  read_options_.block_size = 16;
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block));
+  ASSERT_RAISES(Invalid, MakeReader(bad_first_block_after_empty));
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(bad_middle_block));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  ASSERT_NE(reader->schema(), nullptr);
+  EXPECT_EQ(*reader->schema(), *test_schema);
+
+  AssertReadNext(reader, &batch);
+  EXPECT_EQ(reader->bytes_read(), 13);
+  ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(test_schema, R"([{"n":10000}])"), *batch);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&batch));
+  EXPECT_EQ(reader->bytes_read(), 13);
+  AssertReadEnd(reader);
+  EXPECT_EQ(reader->bytes_read(), 13);
+}
+
+TEST_P(StreamingReaderTest, IgnoreLeadingEmptyBlocks) {
+  std::string test_json(32, '\n');
+  test_json += R"({"b": true, "s": "foo"})";
+  ASSERT_EQ(test_json.length(), 55);
+
+  parse_options_.explicit_schema = schema({field("b", boolean()), field("s", utf8())});
+  read_options_.block_size = 24;
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(test_json));
+  EXPECT_EQ(reader->bytes_read(), 0);
+
+  auto expected_schema = parse_options_.explicit_schema;
+  auto expected_batch = RecordBatchFromJSON(expected_schema, R"([{"b":true,"s":"foo"}])");
+
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  std::shared_ptr<RecordBatch> actual_batch;
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 55);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaErrorOnUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "t": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = 48;
+  ASSERT_RAISES(Invalid, MakeReader(test_json));
+
+  expected_fields.push_back(field("t", utf8()));
+  expected_schema = schema(expected_fields);
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+
+  parse_options_.explicit_schema = expected_schema;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  ASSERT_RAISES(Invalid, reader->ReadNext(&actual_batch));
+  EXPECT_EQ(reader->bytes_read(), 64);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, ExplicitSchemaIgnoreUnexpectedFields) {
+  std::string test_json =
+      Join({R"({"s": "foo", "u": "2022-01-01"})", R"({"s": "foo", "t": "2022-01-01"})",
+            R"({"s": "foo", "t": "2022-01-01", "b": true})"},
+           "\n");
+
+  FieldVector expected_fields = {field("s", utf8()), field("t", utf8())};
+  std::shared_ptr<Schema> expected_schema = schema(expected_fields);
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+  std::shared_ptr<StreamingReader> reader;
+
+  parse_options_.explicit_schema = expected_schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Ignore;
+  read_options_.block_size = 48;
+
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":null}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 32);
+
+  expected_batch =
+      RecordBatchFromJSON(expected_schema, R"([{"s":"foo","t":"2022-01-01"}])");
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 64);
+
+  AssertReadNext(reader, &actual_batch);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 106);
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, InferredSchema) {
+  auto test_json = Join(
+      {
+          R"({"a": 0, "b": "foo"       })",
+          R"({"a": 1, "c": true        })",
+          R"({"a": 2, "d": "2022-01-01"})",
+      },
+      "\n", true);
+
+  std::shared_ptr<StreamingReader> reader;
+  std::shared_ptr<Schema> expected_schema;
+  std::shared_ptr<RecordBatch> expected_batch;
+  std::shared_ptr<RecordBatch> actual_batch;
+
+  FieldVector fields = {field("a", int64()), field("b", utf8())};
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
+  parse_options_.explicit_schema = nullptr;
+
+  // Schema derived from the first line
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 32;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 0, "b": "foo"}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 28);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 1, "b": null}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([{"a": 2, "b": null}])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  // Schema derived from the first 2 lines
+  fields.push_back(field("c", boolean()));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 64;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null},
+    {"a": 1, "b":  null, "c": true}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 56);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 2, "b": null, "c": null}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  // Schema derived from all 3 lines
+  fields.push_back(field("d", timestamp(TimeUnit::SECOND)));
+  expected_schema = schema(fields);
+
+  read_options_.block_size = 96;
+  ASSERT_OK_AND_ASSIGN(reader, MakeReader(test_json));
+  ASSERT_NE(reader->schema(), nullptr);
+  ASSERT_EQ(*reader->schema(), *expected_schema);
+
+  expected_batch = RecordBatchFromJSON(expected_schema, R"([
+    {"a": 0, "b": "foo", "c": null, "d":  null},
+    {"a": 1, "b":  null, "c": true, "d":  null},
+    {"a": 2, "b":  null, "c": null, "d":  "2022-01-01"}
+  ])");
+  AssertReadNext(reader, &actual_batch);
+  EXPECT_EQ(reader->bytes_read(), 84);
+  ASSERT_BATCHES_EQUAL(*expected_batch, *actual_batch);
+
+  AssertReadEnd(reader);
+}
+
+TEST_P(StreamingReaderTest, AsyncReentrancy) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches + 1);
+  ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(expected.json, kIoLatency));
+  EXPECT_EQ(reader->bytes_read(), 0);
+  for (auto& future : futures) {
+    future = reader->ReadNextAsync();
+  }
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, All(std::move(futures)));
+  EXPECT_EQ(reader->bytes_read(), expected.json_size);
+  ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(std::move(results)));
+  EXPECT_EQ(batches.back(), nullptr);
+  batches.pop_back();
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  ASSERT_TABLES_EQUAL(*expected.table, *table);
+}
+
+TEST_P(StreamingReaderTest, FuturesOutliveReader) {
+  constexpr int kNumRows = 16;
+  constexpr double kIoLatency = 1e-2;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  auto stream = MakeTestStream(expected.json, kIoLatency);
+  std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches);
+  std::weak_ptr<StreamingReader> weak_reader;
+  {
+    ASSERT_OK_AND_ASSIGN(auto reader, MakeReader(stream));
+    weak_reader = reader;
+    EXPECT_EQ(reader->bytes_read(), 0);
+    for (auto& future : futures) {
+      future = reader->ReadNextAsync();
+    }
+  }
+
+  auto all_future = All(std::move(futures));
+  AssertNotFinished(all_future);
+  EXPECT_EQ(weak_reader.use_count(), 0);
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto results, all_future);
+  ASSERT_OK_AND_ASSIGN(auto batches, internal::UnwrapOrRaise(std::move(results)));
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  ASSERT_TABLES_EQUAL(*expected.table, *table);
+}
+
+TEST_P(StreamingReaderTest, NestedParallelism) {
+  constexpr int kNumRows = 16;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+  auto task = [&generator] { return CollectAsyncGenerator(generator).result(); };
+
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
+  ASSERT_OK_AND_ASSIGN(generator, MakeGenerator(expected.json));
+  ASSERT_OK_AND_ASSIGN(auto batches_future, thread_pool->Submit(task));
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto batches, batches_future);
+  ASSERT_EQ(batches.size(), expected.batches.size());
+  ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));
+  ASSERT_TABLES_EQUAL(*expected.table, *table);
+}
+
+TEST_P(StreamingReaderTest, StressBufferedReads) {
+  constexpr int kNumRows = 500;
+
+  auto expected = GenerateTestCase(kNumRows);
+  parse_options_.explicit_schema = expected.schema;
+  parse_options_.unexpected_field_behavior = UnexpectedFieldBehavior::Error;
+  read_options_.block_size = expected.block_size;
+
+  std::vector<Future<std::shared_ptr<RecordBatch>>> futures(expected.num_batches);

Review Comment:
   Should we add one or two futures for EOF results?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org