You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/09/05 22:18:33 UTC

[impala] branch master updated: IMPALA-10798: Initial support for reading JSON files

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f06a7b05 IMPALA-10798: Initial support for reading JSON files
2f06a7b05 is described below

commit 2f06a7b052cc95afcf4b0485cbc4028de33942e8
Author: Eyizoha <18...@163.com>
AuthorDate: Tue Mar 21 18:05:27 2023 +0800

    IMPALA-10798: Initial support for reading JSON files
    
    Prototype of HdfsJsonScanner implemented based on rapidjson, which
    supports scanning data from splitting json files.
    
    The scanning of JSON data is mainly completed by two parts working
    together. The first part is the JsonParser responsible for parsing the
    JSON object, which is implemented based on the SAX-style API of
    rapidjson. It reads data from the char stream, parses it, and calls the
    corresponding callback function when encountering the corresponding JSON
    element. See the comments of the JsonParser class for more details.
    
    The other part is the HdfsJsonScanner, which inherits from HdfsScanner
    and provides callback functions for the JsonParser. The callback
    functions are responsible for providing data buffers to the Parser and
    converting and materializing the Parser's parsing results into RowBatch.
    It should be noted that the parser returns numeric values as strings to
    the scanner. The scanner uses the TextConverter class to convert the
    strings to the desired types, similar to how the HdfsTextScanner works.
    This is an advantage compared to using number value provided by
    rapidjson directly, as it eliminates concerns about inconsistencies in
    converting decimals (e.g. losing precision).
    
    Added a startup flag, enable_json_scanner, to be able to disable this
    feature if we hit critical bugs in production.
    
    Limitations
     - Multiline json objects are not fully supported yet. It is ok when
       each file has only one scan range. However, when a file has multiple
       scan ranges, there is a small probability of incomplete scanning of
       multiline JSON objects that span ScanRange boundaries (in such cases,
       parsing errors may be reported). For more details, please refer to
       the comments in the 'multiline_json.test'.
     - Compressed JSON files are not supported yet.
     - Complex types are not supported yet.
    
    Tests
     - Most of the existing end-to-end tests can run on JSON format.
     - Add TestQueriesJsonTables in test_queries.py for testing multiline,
       malformed, and overflow in JSON.
    
    Change-Id: I31309cb8f2d04722a0508b3f9b8f1532ad49a569
    Reviewed-on: http://gerrit.cloudera.org:8080/19699
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/CMakeLists.txt                                  |   3 +
 be/src/exec/CMakeLists.txt                         |   1 +
 be/src/exec/hdfs-scan-node-base.cc                 |  11 +
 be/src/exec/json/CMakeLists.txt                    |  36 +++
 be/src/exec/json/hdfs-json-scanner.cc              | 335 +++++++++++++++++++++
 be/src/exec/json/hdfs-json-scanner.h               | 179 +++++++++++
 be/src/exec/json/json-parser-test.cc               | 126 ++++++++
 be/src/exec/json/json-parser.cc                    | 234 ++++++++++++++
 be/src/exec/json/json-parser.h                     | 301 ++++++++++++++++++
 be/src/exec/text-converter.inline.h                |   1 -
 be/src/util/backend-gflag-util.cc                  |   2 +
 bin/rat_exclude_files.txt                          |   1 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../org/apache/impala/catalog/HdfsFileFormat.java  |   2 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  18 +-
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 testdata/bin/create-load-data.sh                   |   4 +-
 testdata/bin/generate-schema-statements.py         |   4 +-
 testdata/bin/load-dependent-tables.sql             |   7 +
 testdata/data/chars-formats.json                   |   3 +
 testdata/data/json_test/complex.json               |   5 +
 testdata/data/json_test/malformed.json             |  18 ++
 testdata/data/json_test/multiline.json             |  17 ++
 testdata/data/json_test/overflow.json              |   6 +
 .../functional/functional_schema_template.sql      |  60 ++++
 .../datasets/functional/schema_constraints.csv     |  14 +
 .../functional-query/functional-query_core.csv     |   1 +
 .../functional-query_dimensions.csv                |   2 +-
 .../functional-query_exhaustive.csv                |   1 +
 .../functional-query/functional-query_pairwise.csv |   1 +
 .../DataErrorsTest/hdfs-json-scan-node-errors.test | 180 +++++++++++
 .../queries/QueryTest/complex_json.test            |  14 +
 .../queries/QueryTest/disable-json-scanner.test    |   7 +
 .../queries/QueryTest/malformed_json.test          |  25 ++
 .../queries/QueryTest/multiline_json.test          |  27 ++
 .../queries/QueryTest/overflow_json.test           |  20 ++
 testdata/workloads/tpcds/tpcds_core.csv            |   1 +
 testdata/workloads/tpcds/tpcds_exhaustive.csv      |   1 +
 testdata/workloads/tpcds/tpcds_pairwise.csv        |   1 +
 testdata/workloads/tpch/tpch_core.csv              |   1 +
 testdata/workloads/tpch/tpch_dimensions.csv        |   2 +-
 testdata/workloads/tpch/tpch_exhaustive.csv        |   1 +
 testdata/workloads/tpch/tpch_pairwise.csv          |   1 +
 tests/common/test_dimensions.py                    |  11 +-
 tests/custom_cluster/test_disable_features.py      |   5 +
 tests/data_errors/test_data_errors.py              |  13 +
 tests/metadata/test_hms_integration.py             |  23 --
 tests/query_test/test_cancellation.py              |   7 +-
 tests/query_test/test_chars.py                     |  10 +-
 tests/query_test/test_date_queries.py              |   5 +-
 tests/query_test/test_decimal_queries.py           |   2 +-
 tests/query_test/test_queries.py                   |  36 ++-
 tests/query_test/test_scanners.py                  |   2 +
 tests/query_test/test_scanners_fuzz.py             |   4 +-
 tests/query_test/test_tpch_queries.py              |   3 +-
 55 files changed, 1747 insertions(+), 54 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index e15d21c95..8c4a4cf0b 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -487,6 +487,7 @@ set (IMPALA_LIBS
   ExecAvro
   ExecAvroIr
   ExecHBase
+  ExecJson
   ExecKudu
   ExecKuduIr
   ExecOrc
@@ -554,6 +555,7 @@ set (UNIFIED_TEST_LIBS
   CommonTests
   ExecTests
   ExecAvroTests
+  ExecJsonTests
   ExecParquetTests
   ExprsTests
   GUtilTests
@@ -585,6 +587,7 @@ if (BUILD_SHARED_LIBS)
     Exec
     ExecAvro
     ExecHBase
+    ExecJson
     ExecKudu
     ExecOrc
     ExecParquet
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index be4283a48..12e2534e5 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -17,6 +17,7 @@
 
 add_subdirectory(avro)
 add_subdirectory(hbase)
+add_subdirectory(json)
 add_subdirectory(kudu)
 add_subdirectory(orc)
 add_subdirectory(parquet)
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 9a12d0bfc..517fdfe57 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -28,6 +28,7 @@
 #include "exec/sequence/hdfs-sequence-scanner.h"
 #include "exec/text/hdfs-text-scanner.h"
 #include "exec/text/hdfs-plugin-text-scanner.h"
+#include "exec/json/hdfs-json-scanner.h"
 
 
 #include <avro/errors.h>
@@ -744,6 +745,9 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
       case THdfsFileFormat::ORC:
         RETURN_IF_ERROR(HdfsOrcScanner::IssueInitialRanges(this, entry.second));
         break;
+      case THdfsFileFormat::JSON:
+        RETURN_IF_ERROR(HdfsJsonScanner::IssueInitialRanges(this, entry.second));
+        break;
       default:
         DCHECK(false) << "Unexpected file type " << entry.first;
     }
@@ -945,6 +949,13 @@ Status HdfsScanNodeBase::CreateAndOpenScannerHelper(HdfsPartitionDescriptor* par
       case THdfsFileFormat::ORC:
         scanner->reset(new HdfsOrcScanner(this, runtime_state_));
         break;
+      case THdfsFileFormat::JSON:
+        if (HdfsJsonScanner::HasBuiltinSupport(compression)) {
+          scanner->reset(new HdfsJsonScanner(this, runtime_state_));
+        } else {
+          return Status("Scanning compressed Json file is not implemented yet.");
+        }
+        break;
       default:
         return Status(
             Substitute("Unknown Hdfs file format type: $0", partition->file_format()));
diff --git a/be/src/exec/json/CMakeLists.txt b/be/src/exec/json/CMakeLists.txt
new file mode 100644
index 000000000..e654ad9f4
--- /dev/null
+++ b/be/src/exec/json/CMakeLists.txt
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec/json")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec/json")
+
+add_library(ExecJson
+  hdfs-json-scanner.cc
+  json-parser.cc
+)
+
+add_dependencies(ExecJson gen-deps)
+
+add_library(ExecJsonTests STATIC
+  json-parser-test.cc
+)
+add_dependencies(ExecJsonTests gen-deps)
+
+ADD_UNIFIED_BE_LSAN_TEST(json-parser-test StreamSize/JsonParserTest.*)
diff --git a/be/src/exec/json/hdfs-json-scanner.cc b/be/src/exec/json/hdfs-json-scanner.cc
new file mode 100644
index 000000000..adc07f887
--- /dev/null
+++ b/be/src/exec/json/hdfs-json-scanner.cc
@@ -0,0 +1,335 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/json/hdfs-json-scanner.h"
+
+#include "common/names.h"
+#include "common/status.h"
+#include "exec/hdfs-scan-node.h"
+#include "exec/scanner-context.inline.h"
+#include "exec/text-converter.h"
+#include "exec/text-converter.inline.h"
+#include "runtime/multi-precision.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tuple-row.h"
+#include "util/decompress.h"
+#include "util/debug-util.h"
+#include "util/scope-exit-trigger.h"
+#include "util/string-parser.h"
+
+#include <gutil/strings/substitute.h>
+#include <map>
+
+using namespace impala;
+using namespace impala::io;
+using namespace strings;
+
+DEFINE_bool(enable_json_scanner, true,
+    "If set false, disable reading from json format tables.");
+
+HdfsJsonScanner::HdfsJsonScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
+    : HdfsScanner(scan_node, state),
+      scanner_state_(CREATED),
+      buffer_status_(Status::OK()),
+      tuple_row_(nullptr),
+      current_pool_(nullptr),
+      error_in_row_(false),
+      num_tuples_materialized_(0),
+      parse_json_timer_(nullptr),
+      get_buffer_timer_(nullptr) { }
+
+HdfsJsonScanner::~HdfsJsonScanner() { }
+
+Status HdfsJsonScanner::Open(ScannerContext* context) {
+  DCHECK_EQ(scanner_state_, CREATED);
+  RETURN_IF_ERROR(HdfsScanner::Open(context));
+
+  parse_json_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "ParseJsonTime");
+  get_buffer_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "GetDataBufferTime");
+  RETURN_IF_ERROR(InitNewRange());
+  scanner_state_ = OPENED;
+  return Status::OK();
+}
+
+void HdfsJsonScanner::Close(RowBatch* row_batch) {
+  DCHECK(!is_closed_);
+  if (row_batch != nullptr) {
+    row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
+    if (scan_node_->HasRowBatchQueue()) {
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
+          std::unique_ptr<RowBatch>(row_batch));
+    }
+  } else {
+    template_tuple_pool_->FreeAll();
+  }
+  data_buffer_pool_->FreeAll();
+  context_->ReleaseCompletedResources(true);
+
+  // Verify all resources (if any) have been transferred or freed.
+  DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0);
+  DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
+  scan_node_->RangeComplete(THdfsFileFormat::JSON,
+      stream_->file_desc()->file_compression);
+  CloseInternal();
+}
+
+Status HdfsJsonScanner::InitNewRange() {
+  DCHECK_EQ(scanner_state_, CREATED);
+
+  // TODO: Optmize for empty projection.
+  vector<string> schema;
+  schema.reserve(scan_node_->materialized_slots().size());
+  for (const SlotDescriptor* slot : scan_node_->materialized_slots()) {
+    schema.push_back(scan_node_->hdfs_table()->GetColumnDesc(slot).name());
+  }
+
+  text_converter_.reset(new TextConverter('\\', "", false, state_->strict_mode()));
+  json_parser_.reset(new JsonParser<HdfsJsonScanner>(schema, this));
+  json_parser_->ResetParser();
+  return Status::OK();
+}
+
+Status HdfsJsonScanner::GetNextInternal(RowBatch* row_batch) {
+  DCHECK(!eos_);
+  DCHECK_GE(scanner_state_, OPENED);
+  DCHECK_NE(scanner_state_, FINISHED);
+
+  current_pool_ = row_batch->tuple_data_pool();
+
+  if (scanner_state_ == OPENED) {
+    // Find the first tuple. If scanner_state_ is not SCANNING, it means we went through
+    // the entire scan range without finding a single tuple. The bytes will be picked up
+    // by the previous scan range in the same file.
+    RETURN_IF_ERROR(FindFirstTuple());
+    if (scanner_state_ != SCANNING) {
+      eos_ = true;
+      scanner_state_ = FINISHED;
+      return Status::OK();
+    }
+  }
+
+  int64_t tuple_buffer_size;
+  RETURN_IF_ERROR(
+      row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_));
+  tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
+  tuple_row_ = row_batch->GetRow(row_batch->AddRow());
+
+  while (scanner_state_ == SCANNING) {
+    num_tuples_materialized_ = 0;
+    int num_tuples = 0;
+    int max_tuples = row_batch->capacity() - row_batch->num_rows();
+
+    RETURN_IF_ERROR(ParseWrapper(max_tuples, &num_tuples));
+    COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples);
+
+    // Because the processes of parsing JSON, materializing tuples, and even reading data
+    // are intertwined, it can be expensive to accurately time them individually.
+    // Therefore, we using this method to measure the time it takes to materialize tuples,
+    // please note that the value obtained will always be inflated because the time it
+    // takes to parse JSON is also included.
+    // TODO: find a better way.
+    COUNTER_SET(scan_node_->materialize_tuple_timer(),
+        parse_json_timer_->value() - get_buffer_timer_->value());
+
+    RETURN_IF_ERROR(CommitRows(num_tuples_materialized_, row_batch));
+
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
+  }
+
+  if (scanner_state_ >= PAST_SCANNING || scan_node_->ReachedLimitShared()) {
+    eos_ = true;
+    scanner_state_ = FINISHED;
+  }
+  return Status::OK();
+}
+
+Status HdfsJsonScanner::FindFirstTuple() {
+  DCHECK_EQ(scanner_state_, OPENED);
+  if (stream_->scan_range()->offset() == 0) {
+    scanner_state_ = SCANNING;
+    return Status::OK();
+  }
+  SCOPED_TIMER(parse_json_timer_);
+  if (json_parser_->MoveToNextJson()) {
+    scanner_state_ = SCANNING;
+    DCHECK_OK(buffer_status_);
+  }
+  return buffer_status_;
+}
+
+Status HdfsJsonScanner::ParseWrapper(int max_tuples, int* num_tuples) {
+  DCHECK(json_parser_->IsTidy());
+  SCOPED_TIMER(parse_json_timer_);
+  Status status = json_parser_->Parse(max_tuples, num_tuples);
+  RETURN_IF_ERROR(buffer_status_);
+  return status;
+}
+
+bool HdfsJsonScanner::HandleConvertError(const SlotDescriptor* desc, const char* data,
+    int len) {
+  error_in_row_ = true;
+  tuple_->SetNull(desc->null_indicator_offset());
+  if (state_->LogHasSpace() || state_->abort_on_error()) {
+    const HdfsTableDescriptor* table = scan_node_->hdfs_table();
+    constexpr int max_view_len = 50;
+    string data_view = string(data, std::min(len, max_view_len));
+    if (len > max_view_len) data_view += "...";
+    string msg = Substitute("Error converting column: $0.$1.$2, type: $3, data: '$4'",
+        table->database(), table->name(), table->GetColumnDesc(desc).name(),
+        desc->type().DebugString(), data_view);
+
+    if (state_->LogHasSpace()) {
+      state_->LogError(ErrorMsg(TErrorCode::GENERAL, msg), 2);
+    }
+
+    if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = Status(msg);
+  }
+  return parse_status_.ok();
+}
+
+Status HdfsJsonScanner::HandleError(rapidjson::ParseErrorCode error, size_t offset) {
+  if (error == rapidjson::kParseErrorTermination) {
+    DCHECK(!parse_status_.ok());
+    RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+    parse_status_ = Status::OK();
+  } else {
+    RETURN_IF_ERROR(state_->LogOrReturnError(ErrorMsg::Init(TErrorCode::GENERAL,
+        Substitute("Parse Json file error, file: $0, offset: $1, cause by: $2",
+        stream_->filename(), stream_->scan_range()->offset() + offset,
+        rapidjson::GetParseError_En(error)))));
+  }
+  return Status::OK();
+}
+
+static bool AllWhitespaceBeforeNewline(uint8_t* begin, int64_t len) {
+  DCHECK(len >= 0);
+  uint8_t* end = begin + len;
+  while (begin != end) {
+    switch (*begin++) {
+      case '\r':
+      case '\t':
+      case ' ': break;
+      case '\n': return true;
+      default: return false;
+    }
+  }
+  return false;
+}
+
+void HdfsJsonScanner::GetNextBuffer(const char** begin, const char** end) {
+  DCHECK(*begin == *end);
+  DCHECK(decompressor_.get() == nullptr) << "Not support decompress json yet.";
+  SCOPED_TIMER(get_buffer_timer_);
+
+  // The eosr indicates that we have scanned all data within the scan range. If the
+  // scanner state is OPENED, it means that we encountered eosr in FindFirstTuple(),
+  // indicating that there is no start of tuple in this scan range, this will be handle by
+  // the previous scan range in the same file. If the scanner state is SCANNING, it means
+  // that we have completed scanning the data within the range, and need to read the next
+  // range of data to complete the scan.
+  if (stream_->eosr()) {
+    if (scanner_state_ == OPENED) return;
+    if (scanner_state_ == SCANNING) scanner_state_ = PAST_SCANNING;
+  }
+
+  if (stream_->eof() || scanner_state_ == FINISHED) return;
+
+  uint8_t* next_buffer_begin;
+  int64_t next_buffer_size;
+  if (scanner_state_ == PAST_SCANNING) {
+    // In the PAST_SCANNING state, we only read a small block data at a time for scanning.
+    // If the parser completes the parsing of the last json object, it will exit the loop
+    // due to BreakParse().
+    if (!stream_->GetBytes(NEXT_BLOCK_READ_SIZE, &next_buffer_begin, &next_buffer_size,
+        &buffer_status_)) {
+      DCHECK(!buffer_status_.ok());
+      return;
+    }
+
+    // A special case is when the first character of the next scan range is a newline
+    // character (perhaps with other whitespace characters before it). Our scan should
+    // stop at the first newline character in the next range, while the parser skips
+    // whitespace characters. If we don't handle this case, the first line of the next
+    // range will be scanned twice. Therefore, we need to return directly here to inform
+    // the parser that eos has been reached.
+    if (AllWhitespaceBeforeNewline(next_buffer_begin, next_buffer_size)) {
+      scanner_state_ = FINISHED;
+      return;
+    }
+  } else {
+    buffer_status_ = stream_->GetBuffer(false, &next_buffer_begin, &next_buffer_size);
+    RETURN_VOID_IF_ERROR(buffer_status_);
+  }
+
+  *begin = reinterpret_cast<char*>(next_buffer_begin);
+  *end = *begin + next_buffer_size;
+}
+
+void HdfsJsonScanner::InitRow() {
+  InitTuple(template_tuple_, tuple_);
+}
+
+void HdfsJsonScanner::SubmitRow() {
+  tuple_row_->SetTuple(0, tuple_);
+  if (EvalConjuncts(tuple_row_)) {
+    ++num_tuples_materialized_;
+    tuple_ = next_tuple(tuple_byte_size() ,tuple_);
+    tuple_row_ = next_row(tuple_row_);
+  }
+  if (UNLIKELY(error_in_row_)) {
+    LogRowParseError();
+    error_in_row_ = false;
+  }
+}
+
+bool HdfsJsonScanner::InvokeWriteSlot(const SlotDescriptor* slot_desc, const char* data,
+    int len) {
+  // TODO: Support Invoke CodeGend WriteSlot
+  const AuxColumnType& aux_type =
+      scan_node_->hdfs_table()->GetColumnDesc(slot_desc).auxType();
+  if (LIKELY(text_converter_->WriteSlot(slot_desc, &aux_type, tuple_, data, len, true,
+      false, current_pool_))) {
+    return true;
+  }
+  return HandleConvertError(slot_desc, data, len);
+}
+
+void HdfsJsonScanner::AddNull(int index) {
+  const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[index];
+  tuple_->SetNull(slot_desc->null_indicator_offset());
+}
+
+bool HdfsJsonScanner::AddBool(int index, bool value) {
+  const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[index];
+  if (UNLIKELY(slot_desc->type().type != TYPE_BOOLEAN)) {
+    return InvokeWriteSlot(slot_desc, value ? "true" : "false", value ? 4 : 5);
+  }
+  void* slot = tuple_->GetSlot(slot_desc->tuple_offset());
+  *reinterpret_cast<bool*>(slot) = value;
+  return true;
+}
+
+bool HdfsJsonScanner::AddString(int index, const char* str, uint32_t len) {
+  const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[index];
+  return InvokeWriteSlot(slot_desc, str, len);
+}
+
+bool HdfsJsonScanner::AddNumber(int index, const char* str, uint32_t len) {
+  const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[index];
+  return InvokeWriteSlot(slot_desc, str, len);
+}
diff --git a/be/src/exec/json/hdfs-json-scanner.h b/be/src/exec/json/hdfs-json-scanner.h
new file mode 100644
index 000000000..47b3cc45d
--- /dev/null
+++ b/be/src/exec/json/hdfs-json-scanner.h
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_HDFS_JSON_SCANNER_H
+#define IMPALA_EXEC_HDFS_JSON_SCANNER_H
+
+#include <memory>
+
+#include "common/status.h"
+#include "exec/json/json-parser.h"
+#include "exec/text/hdfs-text-scanner.h"
+#include "runtime/mem-pool.h"
+#include "runtime/tuple-row.h"
+#include "util/runtime-profile-counters.h"
+
+
+namespace impala {
+
+class ScannerContext;
+struct HdfsFileDesc;
+
+/// HdfsScanner implementation that understands json-formatted records.
+///
+/// Splitting json files:
+/// Similar to HdfsTextScanner, this scanner handles json files split across multiple
+/// blocks/scan ranges. Note that the split can occur anywhere in the file, e.g. in the
+/// middle of a row. Each scanner starts materializing tuples right after the first row
+/// delimiter found in the scan range, and stops at the first row delimiter occurring past
+/// the end of the scan range. If no delimiter is found in the scan range, the scanner
+/// doesn't materialize anything. This scheme ensures that every row is materialized by
+/// exactly one scanner.
+///
+/// Error handling:
+/// During the process of scanning JSON, there are two types of errors may occur. The
+/// first type is data conversion errors, such as attempting to convert a non-numeric
+/// string to a number. These errors are detected and reported by TextConverter, and
+/// handled by HdfsJsonScanner::HandleConvertError(), it will set the slot to NULL,
+/// and record the error situation. If abort_on_error is true, it also returns false to
+/// the event handling function of JsonParser, causing the JsonParser to interrupt parsing
+/// and report the kParseErrorTermination error code. The second type of error occurs when
+/// the JSON itself has formatting errors, such as missing colons or commas or including
+/// invalid values. These errors are detected and reported as corresponding error codes by
+/// JsonParser, and they are handled by HdfsJsonScanner::HandleError(). It will record the
+/// error situation, and if abort_on_error is true, also returns an error status to
+/// JsonParser, causing the query to be aborted.
+class HdfsJsonScanner : public HdfsScanner {
+ public:
+  HdfsJsonScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
+  virtual ~HdfsJsonScanner();
+
+  /// Implementation of HdfsScanner interface.
+  virtual Status Open(ScannerContext* context) override WARN_UNUSED_RESULT;
+  virtual void Close(RowBatch* row_batch) override;
+
+  THdfsFileFormat::type file_format() const override {
+    return THdfsFileFormat::JSON;
+  }
+
+  /// Issue io manager byte ranges for 'files'.
+  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
+      const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT {
+    return HdfsTextScanner::IssueInitialRanges(scan_node, files);
+  }
+
+  /// Return true if we have builtin support for scanning text files compressed with this
+  /// codec.
+  static bool HasBuiltinSupport(THdfsCompression::type compression) {
+    // TODO: Support scanning compressed json file.
+    DCHECK_EQ(compression, THdfsCompression::NONE);
+    if (compression == THdfsCompression::NONE) return true;
+    return false;
+  }
+
+ private:
+  friend class JsonParser<HdfsJsonScanner>;
+
+  virtual Status InitNewRange() override WARN_UNUSED_RESULT;
+
+  virtual Status GetNextInternal(RowBatch* row_batch) override WARN_UNUSED_RESULT;
+
+  /// Find the start of the first tuple in this scan range. If successful, advances the
+  /// scanner state to SCANNING. Otherwise, consume the entire scan range without updating
+  /// the scanner state (e.g. if there is a very large JSON object).
+  Status FindFirstTuple() WARN_UNUSED_RESULT;
+
+  /// A wrapper around JsonParser::Parse() that checks for buffer errors before returning
+  /// its Status. If there are any buffer errors, they will be returned instead of the
+  /// Status from JsonParser::Parse(). Because GetNextBuffer() is called as a callback,
+  /// we need this approach to check for buffer errors.
+  Status ParseWrapper(int max_tuples, int* num_tuples) WARN_UNUSED_RESULT;
+
+  /// Called when adding a value to the tuple fails. Sets the target slot to null and
+  /// reports the error message. Returns false if necessary to abort the scan.
+  bool HandleConvertError(const SlotDescriptor* desc, const char* data, int len);
+
+  /// Scanner state, advances as the scanning process progresses.
+  enum ScannerState {
+    CREATED,
+
+    /// Scanner is opened, ready to work.
+    OPENED,
+
+    /// Enter this state after finding the position of the first tuple in ScanRange,
+    /// indicating that the scanner is scanning data normally.
+    SCANNING,
+
+    /// Indicates that the scan range has been scanned, but the first row of data past the
+    /// end of the scan range still needs to be read for parsing.
+    PAST_SCANNING,
+
+    /// Scanning is finished, no more data to process.
+    FINISHED
+  } scanner_state_;
+
+  /// The returned status when fetching data from stream. Set in GetNextBuffer() and
+  /// checked in ParseWrapper().
+  Status buffer_status_;
+
+  /// TupleRow pointer of current row batch, set in GetNextInternal().
+  TupleRow* tuple_row_;
+
+  /// MemPool pointer of current row batch, set in GetNextInternal().
+  MemPool* current_pool_;
+
+  /// This is used to indicate whether an error has occurred in the currently parsed row.
+  bool error_in_row_;
+
+  /// A counter for the number of tuples materialized during a single ParseWrapper() call.
+  int num_tuples_materialized_;
+
+  /// Time JsonParser invoking, this roughly includes the time for parsing the JSON,
+  /// materializing the tuple, and reading the data.
+  RuntimeProfile::Counter* parse_json_timer_;
+
+  /// Time get next buffer in GetNextBuffer().
+  RuntimeProfile::Counter* get_buffer_timer_;
+
+  const static int NEXT_BLOCK_READ_SIZE = 64 * 1024; //bytes
+
+  /// JsonParser is a class template that implements parsing of JSON data stream. It is
+  /// supplied with a data buffer by its template parameter Scanner, and some callback
+  /// functions for assembling the parsing results into row format, see details in the
+  /// JsonParse comment.
+  std::unique_ptr<JsonParser<HdfsJsonScanner>> json_parser_;
+
+  /// Invoke WriteSlot (CodeGen or Interpret version) to materialize the slot, and handle
+  /// errors when conversion fails.
+  inline bool InvokeWriteSlot(const SlotDescriptor* slot_desc, const char* data, int len);
+
+  /// All functions below are callback functions provided for JsonParse, with their
+  /// specific uses described in the JsonParse comment.
+  Status HandleError(rapidjson::ParseErrorCode error, size_t offset);
+  bool BreakParse() { return scanner_state_ == PAST_SCANNING; }
+  void GetNextBuffer(const char** begin, const char** end);
+  void InitRow();
+  void SubmitRow();
+  void AddNull(int index);
+  bool AddBool(int index, bool value);
+  bool AddString(int index, const char* str, uint32_t len);
+  bool AddNumber(int index, const char* str, uint32_t len);
+};
+
+}
+
+#endif
diff --git a/be/src/exec/json/json-parser-test.cc b/be/src/exec/json/json-parser-test.cc
new file mode 100644
index 000000000..576403e34
--- /dev/null
+++ b/be/src/exec/json/json-parser-test.cc
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+#include <string>
+#include <sstream>
+#include <algorithm>
+
+#include "exec/json/json-parser.h"
+#include "testutil/gtest-util.h"
+
+using namespace std;
+
+namespace impala {
+
+class JsonParserTest : public ::testing::TestWithParam<int> {
+ public:
+  JsonParserTest() {
+    random_shuffle(json_data_.begin(), json_data_.end());
+    stringstream data_stream, result_stream;
+    for (const auto& p : json_data_) {
+      data_stream << p.first << '\n';
+      if (p.second.empty()) continue;
+      result_stream << p.second << '\n';
+    }
+    data_ = data_stream.str();
+    result_ = result_stream.str();
+  }
+
+  virtual void SetUp() override {
+    data_pos_ = 0;
+    stream_size_ = GetParam();
+  }
+
+  void NextBuffer(const char** begin, const char** end) {
+    EXPECT_EQ(*begin, *end);
+    *begin = *end = nullptr;
+    if (data_pos_ > data_.size()) return;
+    *begin = data_.data() + data_pos_;
+    size_t len = min(stream_size_, data_.size() - data_pos_);
+    *end = *begin + len;
+    data_pos_ += len;
+  }
+
+  const vector<string>& schema() const { return schema_; }
+
+  const string& result() const { return result_; }
+
+ private:
+  size_t data_pos_ = 0;
+  size_t stream_size_;
+  string data_;
+  string result_;
+
+  vector<string> schema_ = {"name", "bool", "score", "address"};
+  vector<pair<string, string>> json_data_ = {
+    // Normal Json
+    {R"({"name": "Linda", "bool": true, "score": 76.3, "address": "Chicago"})",
+        "Linda, true, 76.3, Chicago, "},
+    {R"({"name": "Mike", "bool": null, "score": 82.1, "address": "Dallas"})",
+        "Mike, null, 82.1, Dallas, "},
+    {R"({"name": "Sara", "bool": false, "score": 94.8, "address": "Seattle"})",
+        "Sara, false, 94.8, Seattle, "},
+
+    // String with escape or special char.
+    {R"({"name": "Joe\nJoe", "bool": null, "score": 100, "address": "{New}\t{York}"})",
+        "Joe\nJoe, null, 100, {New}\t{York}, "},
+    {R"({"name": "$}~{$", "bool": false, "score": 95.2, "address": "\"{Los} \\Angeles"})",
+        "$}~{$, false, 95.2, \"{Los} \\Angeles, "},
+    {R"({"name": "A\"}{\"A", "bool": true, "score": 79.4, "address": "[]()[{}{}]"})",
+        "A\"}{\"A, true, 79.4, []()[{}{}], "},
+
+    // Column miss or out-of-order.
+    {R"({"name": "Grace", "bool": false, "score": 92.3})",
+        "Grace, false, 92.3, null, "},
+    {R"({"bool": false, "score": 90.5, "name": "Emily"})",
+        "Emily, false, 90.5, null, "},
+    {R"({"score": 87.6, "bool": false, "name": "David", "address": "Boston"})",
+        "David, false, 87.6, Boston, "},
+
+    // Column with complex type.
+    {R"({"name": "Bob", "bool": true, "score": 78.9, "complex": [1, {"a2": [4, 5, 6]}]})",
+        "Bob, true, 78.9, null, "},
+    {R"({"name": "Peter", "object": {"array": [1, 2, 3], "object": {"empty": []}}})",
+        "Peter, null, null, null, "},
+    {R"({"name": "Sophia", "array": [1, 2, 3, {"test": null}], "address": "San Diego"})",
+        "Sophia, null, null, San Diego, "},
+
+    // Exposed string, number, or array
+    {R"("{\"name\": \"Aisha\", \"bool\": true, \"score\": 86.1}")", ""},
+    {R"(-1234.56789)", ""},
+    {R"(["Pavel", 123e2, {"test": null}, {"a1": [1, 2, "{abc, [123]}"]}])", ""}
+  };
+};
+
+INSTANTIATE_TEST_CASE_P(StreamSize, JsonParserTest, ::testing::Values(1, 16, 256));
+
+TEST_P(JsonParserTest, Basic) {
+  SimpleJsonScanner js(schema(), [this](const char** begin, const char** end) {
+    this->NextBuffer(begin, end);
+  });
+  constexpr int max_rows = 10;
+  int num_rows = 0;
+  do {
+    EXPECT_OK(js.Scan(max_rows, &num_rows));
+    EXPECT_GE(num_rows, 0);
+    EXPECT_LE(num_rows, max_rows);
+  } while (num_rows);
+  EXPECT_EQ(result(), js.Result());
+}
+
+}
diff --git a/be/src/exec/json/json-parser.cc b/be/src/exec/json/json-parser.cc
new file mode 100644
index 000000000..423dcd094
--- /dev/null
+++ b/be/src/exec/json/json-parser.cc
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/json/json-parser.h"
+
+#include "exec/json/hdfs-json-scanner.h"
+#include "gutil/strings/ascii_ctype.h"
+
+using namespace impala;
+using namespace rapidjson;
+
+using std::vector;
+using std::string;
+
+#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return false
+
+template <class Scanner>
+JsonParser<Scanner>::JsonParser(const vector<string>& schema, Scanner* scanner)
+  : num_fields_(schema.size()), scanner_(scanner), stream_(this) {
+  field_found_.resize(num_fields_);
+  int index = 0;
+  for (const string& name : schema) {
+    string lower_case_key(name.size(), 0);
+    for (size_t i = 0; i < name.size(); ++i) {
+      lower_case_key[i] = ascii_tolower(name[i]);
+    }
+    DCHECK(field_indexs_.find(lower_case_key) == field_indexs_.end());
+    field_indexs_[lower_case_key] = index++;
+  }
+}
+
+template <class Scanner>
+void JsonParser<Scanner>::ResetParser() {
+  row_initialized_ = false;
+  array_depth_ = 0;
+  object_depth_ = 0;
+  current_field_idx_ = -1;
+  memset(field_found_.data(), false, field_found_.size());
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::IsTidy() {
+  // Check if there are no unclosed arrays or objects.
+  bool no_unclosed_elements = array_depth_ == 0 && object_depth_ == 0;
+
+  // Check if there are no field or row in handling.
+  bool no_handling_field_or_row = current_field_idx_ == -1 && !row_initialized_;
+
+  // Check if there are no any fields found, i.e. all false in 'field_found_'.
+  bool no_fields_found = field_found_.size() == 0 ||
+      (field_found_[0] == false &&
+      !memcmp(field_found_.data(), field_found_.data() + 1, field_found_.size() - 1));
+
+  // Return true if all conditions are met, indicating that the parser is tidy
+  return no_unclosed_elements && no_handling_field_or_row && no_fields_found;
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::MoveToNextJson() {
+  while (!stream_.Eos()) {
+    if (stream_.Take() == '\n') {
+      return true;
+    }
+  }
+  return false;
+}
+
+template <class Scanner>
+Status JsonParser<Scanner>::Parse(int max_rows, int* num_rows) {
+  while (*num_rows < max_rows) {
+    constexpr auto parse_flags = kParseNumbersAsStringsFlag | kParseStopWhenDoneFlag;
+    // Reads characters from the stream, parses them and publishes events to this
+    // handler (JsonParser).
+    reader_.Parse<parse_flags>(stream_, *this);
+
+    if (UNLIKELY(reader_.HasParseError())) {
+      if (reader_.GetParseErrorCode() == kParseErrorDocumentEmpty) {
+        // When the parser encounters the first non-empty character as '\0' during once
+        // parsing, it assumes the end of the stream and throws the error code
+        // "kParseErrorDocumentEmpty". If the stream has indeed reached its end, we can
+        // return normally. However, if a file corruption causes a '\0' to be inserted
+        // between JSON objects, the stream hasn't actually ended, and we should
+        // continue scanning.
+        if (UNLIKELY(!stream_.Eos())) {
+          DCHECK_EQ(stream_.Peek(), '\0');
+          stream_.Take();
+          continue;
+        }
+        DCHECK(IsTidy());
+        return Status::OK();
+      }
+      // Call the scanner to handling error. If the error is successfully handled,
+      // continue parsing. Since parsing have been interrupted and we may be stopped in
+      // the middle of a JSON object, we need to move to the starting position of the
+      // next object and reset the parser status before starting the next parse.
+      // But there is a special case where the error code reported by the parser is
+      // kParseErrorObjectMissCommaOrCurlyBracket, indicating that the current JSON
+      // object is missing a closing curly bracket. In this case, we should already be
+      // stopped at the end of this JSON object and there is no need to move to the
+      // starting position of the next object. If MoveToNextJson() is still called in
+      // this case, it is highly likely to cause us to miss a complete row.
+      RETURN_IF_ERROR(scanner_->HandleError(reader_.GetParseErrorCode(),
+          reader_.GetErrorOffset()));
+      if (row_initialized_) FinishRow();
+      if (reader_.GetParseErrorCode() != kParseErrorObjectMissCommaOrCurlyBracket) {
+        MoveToNextJson();
+      }
+      ResetParser();
+    }
+
+    ++(*num_rows);
+    if (UNLIKELY(scanner_->BreakParse())) break;
+  }
+
+  return Status::OK();
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::Key(const char* str, uint32_t len, bool copy) {
+  if (object_depth_ == 1 && array_depth_ == 0) {
+    DCHECK_EQ(current_field_idx_, -1);
+    string lower_case_key(len, 0);
+    for (uint32_t i = 0; i < len; ++i) {
+      lower_case_key[i] = ascii_tolower(str[i]);
+    }
+    auto iter = field_indexs_.find(lower_case_key);
+    current_field_idx_ = (iter == field_indexs_.end()) ? -1 : iter->second;
+  }
+  return true;
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::StartObject() {
+  ++object_depth_;
+  if (object_depth_ == 1 && array_depth_ == 0) {
+    scanner_->InitRow();
+    row_initialized_ = true;
+  }
+  return true;
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::EndObject(uint32_t mem_count) {
+  --object_depth_;
+  if (UNLIKELY(IsRequiredField())) {
+    // Don't support complex type yet, treated as null for now.
+    // TODO: support complex type.
+    scanner_->AddNull(current_field_idx_);
+    field_found_[current_field_idx_] = true;
+    current_field_idx_ = -1;
+  }
+  if (object_depth_ == 0 && array_depth_ == 0) {
+    FinishRow();
+    row_initialized_ = false;
+    memset(field_found_.data(), false, field_found_.size());
+  }
+  return true;
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::StartArray() {
+  ++array_depth_;
+  return true;
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::EndArray(uint32_t mem_count) {
+  --array_depth_;
+  if (UNLIKELY(IsRequiredField())) {
+    // Don't support complex type yet, treated as null for now.
+    // TODO: support complex type.
+    scanner_->AddNull(current_field_idx_);
+    field_found_[current_field_idx_] = true;
+    current_field_idx_ = -1;
+  }
+  return true;
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::Null() {
+  if (IsRequiredField()) {
+    scanner_->AddNull(current_field_idx_);
+    field_found_[current_field_idx_] = true;
+    current_field_idx_ = -1;
+  }
+  return true;
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::Bool(bool boolean) {
+  if (IsRequiredField()) {
+    RETURN_IF_FALSE(scanner_->AddBool(current_field_idx_, boolean));
+    field_found_[current_field_idx_] = true;
+    current_field_idx_ = -1;
+  }
+  return true;
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::RawNumber(const char* str, uint32_t len, bool copy) {
+  if (IsRequiredField()) {
+    RETURN_IF_FALSE(scanner_->AddNumber(current_field_idx_, str, len));
+    field_found_[current_field_idx_] = true;
+    current_field_idx_ = -1;
+  }
+  return true;
+}
+
+template <class Scanner>
+bool JsonParser<Scanner>::String(const char* str, uint32_t len, bool copy) {
+  if (IsRequiredField()) {
+    RETURN_IF_FALSE(scanner_->AddString(current_field_idx_, str, len));
+    field_found_[current_field_idx_] = true;
+    current_field_idx_ = -1;
+  }
+  return true;
+}
+
+template class impala::JsonParser<SimpleJsonScanner>;
+template class impala::JsonParser<HdfsJsonScanner>;
diff --git a/be/src/exec/json/json-parser.h b/be/src/exec/json/json-parser.h
new file mode 100644
index 000000000..a6db77578
--- /dev/null
+++ b/be/src/exec/json/json-parser.h
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <functional>
+#include <sstream>
+#include <vector>
+#include <unordered_map>
+
+#include "common/compiler-util.h"
+#include "common/status.h"
+#include "rapidjson/error/en.h"
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/reader.h"
+
+namespace impala {
+
+/// A template class to assist in parsing JSON, using the member function Parse() to parse
+/// the JSON text and convert it to a row format defined by the schema. The JSON text
+/// consists of multiple JSON objects, each of which is parsed into one row of data.
+///
+/// Scanner is a class defined for input and output for this class, and it needs to
+/// implement the following member functions:
+///
+/// Returns a boolean value to indicate whether Parse() should end parsing and return.
+/// It is called once after parsing each object.
+///   bool BreakParse();
+///
+/// Provides input for the parser, with 'begin' and 'end' being the start and end
+/// positions of the next buffer to be parsed.
+///   void GetNextBuffer(const char** begin, const char** end);
+///
+/// Handles errors. This function is called when the following functions encounter an
+/// error (return false). If this function returns OK, parsing continues; otherwise, the
+/// error status is returned by the Parse().
+///   Status HandleError(rapidjson::ParseErrorCode error, size_t offset);
+///
+/// The following functions materialize output tuples. Functions with void return type
+/// must succeed. Functions with bool return type return true on succeed, and return false
+/// to stop parsing the whole scan range.
+///
+/// Called when starting to parse a new object, initializes a new row of data.
+///   void InitRow();
+///
+/// Called when finishing parsing an object, submits a row of data.
+///   void SubmitRow();
+///
+/// Called when encountering a null value during parsing. Index is the index of the key
+/// for this value in the schema, and so on for the other following functions.
+///   void AddNull(int index);
+///   bool AddBool(int index, bool value);
+///   bool AddString(int index, const char* str, uint32_t len);
+///   bool AddNumber(int index, const char* str, uint32_t len);
+///
+/// This parser is implemented based on the SAX-style API provided by Rapidjson.
+/// This class provides event handler functions for the rapidjson::Reader to achieve JSON
+/// parsing. See more details in:
+///   https://rapidjson.org/md_doc_sax.html
+///   https://rapidjson.org/classrapidjson_1_1_handler.html
+template <class Scanner>
+class JsonParser {
+public:
+  /// A stream of characters used to wrap the buffer, wrapping the buffer into a format
+  /// acceptable by RapidJson.
+  class CharStream {
+  public:
+    typedef char Ch;
+
+    CharStream(JsonParser* parser) : parser_(parser) { }
+
+    /// Determines whether the stream has ended. After the current buffer is parsed,
+    /// GetNextBuffer is called to request more data from Scanner. It is only considered
+    /// the end of the stream when Scanner cannot provide more data.
+    ALWAYS_INLINE bool Eos() {
+      if (LIKELY(current_ != end_)) return false;
+      tell_ = Tell();
+      parser_->GetNextBuffer(&current_, &end_);
+      begin_ = current_;
+      return current_ == end_;
+    }
+
+    ALWAYS_INLINE Ch Peek() {
+      return UNLIKELY(Eos()) ? '\0' : *current_;
+    }
+
+    ALWAYS_INLINE Ch Take() {
+      return UNLIKELY(Eos()) ? '\0' : *current_++;
+    }
+
+    ALWAYS_INLINE size_t Tell() const {
+      return static_cast<size_t>(current_ - begin_) + tell_;
+    }
+
+    /// The following functions are only required for the output stream, so we don't need
+    /// to implement them here. However, to avoid compilation errors, we must explicitly
+    /// indicate them as not available.
+    Ch* PutBegin() { CHECK(false); return 0; }
+    void Put(Ch) { CHECK(false); }
+    size_t PutEnd(Ch*) { CHECK(false); return 0; }
+    void Flush() { CHECK(false); }
+
+  private:
+    JsonParser* parser_;
+    const Ch* current_ = nullptr;
+    const Ch* begin_ = nullptr;
+    const Ch* end_ = nullptr;
+    size_t tell_ = 0;
+  };
+
+  JsonParser(const std::vector<std::string>& schema, Scanner* scanner);
+
+  void ResetParser();
+
+  /// A debug function that checks whether the parser is tidy. If it is not, it means that
+  /// unexpected errors may have occurred in the parser.
+  bool IsTidy();
+
+  /// Consume char stream to find the start of the first tuple in this scan range. Return
+  /// true if found, This function works under the premise that there is only one JSON
+  /// object per line in the JSON file and there are no newline in the JSON object.
+  bool MoveToNextJson();
+
+  /// Using callback provided by Scanner to parses JSON data and converts it to row
+  /// format. Returns in the following cases:
+  /// 1. Maximum parsing row limit max_rows is reached.
+  /// 2. No more data needs to be parsed (end of stream is reached).
+  /// 3. An error is encountered when converting to a row, or a parsing error (caused by
+  ///    invalid JSON format, etc.), and Scanner returns an error status after handling
+  ///    the error.
+  /// 4. Scanner's BreakParse() indicates the need to end parsing.
+  Status Parse(int max_rows, int* num_rows);
+
+  CharStream& stream() { return stream_; }
+
+private:
+  friend class rapidjson::GenericReader<rapidjson::UTF8<>, rapidjson::UTF8<>>;
+
+  inline void FinishRow() {
+    DCHECK(row_initialized_);
+    for (int i = 0; i < num_fields_; ++i) {
+      if (UNLIKELY(!field_found_[i])) {
+        scanner_->AddNull(i);
+      }
+    }
+    scanner_->SubmitRow();
+  }
+
+  inline void GetNextBuffer(const char** begin, const char** end) {
+    scanner_->GetNextBuffer(begin, end);
+  }
+
+  inline bool IsRequiredField() {
+    return current_field_idx_ != -1 && object_depth_ == 1 && array_depth_ == 0;
+  }
+
+  /// The following functions are event handlers provided for Rapidjson SAX. When parsing
+  /// a JSON, the corresponding handlers will be called upon encountering the
+  /// corresponding element. The main processing flow for a row of data is as follows:
+  /// 1. Call StartObject() at the beginning of the JSON object to initialize a new row.
+  /// 2. Call Key() upon encountering a key to find its index of the row in the schema and
+  ///    update current_field_idx_.
+  /// 3. Call the corresponding type processing function upon encountering a value to add
+  ///    the value to the position pointed to by current_field_idx_ in the row.
+  /// 4. Call EndObject() upon reaching the end of the JSON object. Add null values for
+  ///    fields not found in the schema, and submit this row.
+  bool Key(const char* str, uint32_t len, bool copy);
+  bool StartObject();
+  bool EndObject(uint32_t mem_count);
+  bool StartArray();
+  bool EndArray(uint32_t mem_count);
+  bool Null();
+  bool Bool(bool boolean);
+  bool RawNumber(const char* str, uint32_t len, bool copy);
+  bool String(const char* str, uint32_t len, bool copy);
+
+  /// We used the kParseNumbersAsStringsFlag flag for parsing, which output numerical type
+  /// values as strings (by calling RawNumber). Therefore, the following handler functions
+  /// will never be called and no need to be implemented. However, to avoid compilation
+  /// errors, we still need to explicitly indicate them as not available.
+  bool Int(int i) { CHECK(false); return false; }
+  bool Uint(unsigned i) { CHECK(false); return false; }
+  bool Int64(int64_t i) { CHECK(false); return false; }
+  bool Uint64(uint64_t i) { CHECK(false); return false; }
+  bool Double(double d) { CHECK(false); return false; }
+
+  /// The number of fields in the schema.
+  const size_t num_fields_;
+
+  /// Scanner pointer used for invoking callback functions.
+  Scanner* scanner_;
+
+  /// Character stream that wraps the JSON data buffer.
+  CharStream stream_;
+
+  /// Mapping of field names to field positions, generated based on the schema and used to
+  /// locate field positions when assembling rows.
+  std::unordered_map<std::string, int> field_indexs_;
+
+  /// RapidJson's SAX-style JSON parser.
+  rapidjson::Reader reader_;
+
+  /// This is mainly used to determine if we have an unfinished row when an error occurs.
+  bool row_initialized_;
+
+  /// Counter used to record the nesting depth of the current JSON array or object during
+  /// parsing.
+  int array_depth_;
+  int object_depth_;
+
+  /// Used to record the current field's position in the row during parsing.
+  /// Updated by Key() based on 'field_indexs_', consumed and reset by other processors.
+  /// -1 indicates a not required field (not in the schema).
+  int current_field_idx_;
+
+  /// Used to record which fields have been found in the current row during parsing.
+  std::vector<char> field_found_;
+};
+
+/// A simple class for testing JsonParser.
+class SimpleJsonScanner {
+public:
+  using GetBufferFunc = std::function<void(const char**, const char**)>;
+
+  SimpleJsonScanner(const std::vector<std::string>& schema, GetBufferFunc get_buffer)
+     : parser_(schema, this), get_buffer_(get_buffer) {
+    parser_.ResetParser();
+    current_row_.resize(schema.size());
+  }
+
+  Status Scan(int max_row, int* num_rows) {
+    *num_rows = 0;
+    if (!parser_.IsTidy()) return Status("Parser is not tidy");
+    RETURN_IF_ERROR(parser_.Parse(max_row, num_rows));
+    return Status::OK();;
+  }
+
+  std::string Result() { return result_.str(); }
+
+private:
+  friend class JsonParser<SimpleJsonScanner>;
+
+  Status HandleError(rapidjson::ParseErrorCode error, size_t offset) {
+    return Status::OK();
+  }
+
+  bool BreakParse() {
+    return false;
+  }
+
+  void GetNextBuffer(const char** begin, const char** end) {
+    get_buffer_(begin, end);
+  }
+
+  void InitRow() { }
+
+  void SubmitRow() {
+    for (const auto& s : current_row_) result_ << s << ", ";
+    result_ << '\n';
+  }
+
+  void AddNull(int index) {
+    current_row_[index] = "null";
+  }
+
+  bool AddBool(int index, bool b) {
+    current_row_[index] = (b ? "true" : "false");
+    return true;
+  }
+
+  bool AddString(int index, const char* b, uint32_t len) {
+    current_row_[index] = string(b, len);
+    return true;
+  }
+
+  bool AddNumber(int index, const char* b, uint32_t len) {
+    current_row_[index] = string(b, len);
+    return true;
+  }
+
+  std::vector<std::string> current_row_;
+  std::stringstream result_;
+  JsonParser<SimpleJsonScanner> parser_;
+  GetBufferFunc get_buffer_;
+};
+
+} // namespace impala
diff --git a/be/src/exec/text-converter.inline.h b/be/src/exec/text-converter.inline.h
index ba27fefa2..93d6bcde8 100644
--- a/be/src/exec/text-converter.inline.h
+++ b/be/src/exec/text-converter.inline.h
@@ -66,7 +66,6 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc,
 
       bool reuse_data = type.IsVarLenStringType() &&
           !(len != 0 && (copy_string || need_escape));
-      if (type.type == TYPE_CHAR) reuse_data &= (buffer_len <= len);
 
       bool base64_decode = false;
       if (auxType->IsBinaryStringSubtype()) {
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index ae9d61ba9..ea09764d4 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -109,6 +109,7 @@ DECLARE_string(file_metadata_reload_properties);
 DECLARE_string(java_weigher);
 DECLARE_int32(iceberg_reload_new_files_threshold);
 DECLARE_bool(enable_skipping_older_events);
+DECLARE_bool(enable_json_scanner);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -425,6 +426,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_use_jamm_weigher(FLAGS_java_weigher == "jamm");
   cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold);
   cfg.__set_enable_skipping_older_events(FLAGS_enable_skipping_older_events);
+  cfg.__set_enable_json_scanner(FLAGS_enable_json_scanner);
   return Status::OK();
 }
 
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index b12b8eff5..13edc28e0 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -170,6 +170,7 @@ testdata/data/widerow.txt
 testdata/data/local_tbl/00000.txt
 testdata/data/hudi_parquet/*
 testdata/data/iceberg_test/*
+testdata/data/json_test/*
 testdata/data/sfs_d2.txt
 testdata/data/sfs_d4.txt
 testdata/datasets/functional/functional_schema_template.sql
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 3943021df..fa933424f 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -262,4 +262,6 @@ struct TBackendGflags {
   115: required i32 iceberg_reload_new_files_threshold
 
   116: required bool enable_skipping_older_events;
+
+  117: required bool enable_json_scanner
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
index 332ddc52d..0d29d06f5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
@@ -248,8 +248,8 @@ public enum HdfsFileFormat {
   public boolean isSplittable(HdfsCompression compression) {
     switch (this) {
       case TEXT:
-        return compression == HdfsCompression.NONE;
       case JSON:
+        return compression == HdfsCompression.NONE;
       case RC_FILE:
       case SEQUENCE_FILE:
       case AVRO:
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 2da3b32e5..08a6b718d 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -467,22 +467,26 @@ public class HdfsScanNode extends ScanNode {
    * a partition that has a format for which we do not support complex types,
    * regardless of whether a complex-typed column is actually referenced
    * in the query.
+   * 2) if we are scanning compressed json file or the json scanner is disabled.
    */
   @Override
   protected void checkForSupportedFileFormats() throws NotImplementedException {
     Preconditions.checkNotNull(desc_);
     Preconditions.checkNotNull(desc_.getTable());
 
-    // Since JSON file format is not yet supported, this block throws an
-    // exception. Once JSON file format will be supported, appropriate changes
-    // can be made under this block.
     for (FeFsPartition part: partitions_) {
-      HdfsFileFormat format = part.getFileFormat();
-      if (format.equals(HdfsFileFormat.JSON)) {
-        throw new NotImplementedException("Scan of table " + desc_.getTableName() +
-                " in format 'JSON' is not supported.");
+      if (!part.getFileFormat().equals(HdfsFileFormat.JSON)) continue;
+      if (!BackendConfig.INSTANCE.isJsonScannerEnabled()) {
+        throw new NotImplementedException(
+            "JSON scans are disabled by --enable_json_scanner flag.");
+      }
+      for (FileDescriptor fd: part.getFileDescriptors()) {
+        if (fd.getFileCompression() == HdfsCompression.NONE) continue;
+        throw new NotImplementedException(
+            "Scanning compressed Json file is not implemented yet: " + fd.getPath());
       }
     }
+
     Column firstComplexTypedCol = null;
     for (Column col: desc_.getTable().getColumns()) {
       if (col.getType().isComplexType()) {
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index f5c7570bf..93025fee3 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -428,4 +428,8 @@ public class BackendConfig {
   public int icebergReloadNewFilesThreshold() {
     return backendCfg_.iceberg_reload_new_files_threshold;
   }
+
+  public boolean isJsonScannerEnabled() {
+    return backendCfg_.enable_json_scanner;
+  }
 }
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 35b2e8322..e9d6183a6 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -211,12 +211,14 @@ function load-custom-schemas {
   mkdir -p ${TMP_DIR}/chars_formats_avro_snap \
    ${TMP_DIR}/chars_formats_parquet \
    ${TMP_DIR}/chars_formats_text \
-   ${TMP_DIR}/chars_formats_orc_def
+   ${TMP_DIR}/chars_formats_orc_def \
+   ${TMP_DIR}/chars_formats_json
 
   ln -s ${IMPALA_HOME}/testdata/data/chars-formats.avro ${TMP_DIR}/chars_formats_avro_snap
   ln -s ${IMPALA_HOME}/testdata/data/chars-formats.parquet ${TMP_DIR}/chars_formats_parquet
   ln -s ${IMPALA_HOME}/testdata/data/chars-formats.orc ${TMP_DIR}/chars_formats_orc_def
   ln -s ${IMPALA_HOME}/testdata/data/chars-formats.txt ${TMP_DIR}/chars_formats_text
+  ln -s ${IMPALA_HOME}/testdata/data/chars-formats.json ${TMP_DIR}/chars_formats_json
 
   # File used by CreateTableLikeOrc tests
   ln -s ${IMPALA_HOME}/testdata/data/alltypes_non_acid.orc ${SCHEMA_TMP_DIR}
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index 750fb3a8d..bd498d429 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -206,6 +206,7 @@ FILE_FORMAT_MAP = {
   'hbase': "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'",
   'kudu': "KUDU",
   'iceberg': "ICEBERG",
+  'json': "JSONFILE",
   }
 
 HIVE_TO_AVRO_TYPE_MAP = {
@@ -823,7 +824,8 @@ def generate_statements(output_name, test_vectors, sections,
         print('HDFS path:', data_path, 'contains data. Data loading can be skipped.')
       else:
         print('HDFS path:', data_path, 'does not exist or is empty. Data will be loaded.')
-        if not db_suffix:
+        load_from_json_file = file_format == 'json' and table_name.endswith('_json')
+        if not db_suffix or load_from_json_file:
           if load:
             hive_output.load_base.append(build_load_statement(load, db_name,
                                                               db_suffix, table_name))
diff --git a/testdata/bin/load-dependent-tables.sql b/testdata/bin/load-dependent-tables.sql
index a75c4af33..57af65ccc 100644
--- a/testdata/bin/load-dependent-tables.sql
+++ b/testdata/bin/load-dependent-tables.sql
@@ -91,6 +91,13 @@ ROW FORMAT delimited fields terminated by ','  escaped by '\\'
 STORED AS TEXTFILE
 LOCATION '/test-warehouse/chars_formats_text';
 
+DROP TABLE IF EXISTS functional_json.chars_formats;
+CREATE EXTERNAL TABLE functional_json.chars_formats
+(cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
+ROW FORMAT delimited fields terminated by ','  escaped by '\\'
+STORED AS JSONFILE
+LOCATION '/test-warehouse/chars_formats_json';
+
 DROP TABLE IF EXISTS functional_avro_snap.chars_formats;
 CREATE EXTERNAL TABLE functional_avro_snap.chars_formats
 (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
diff --git a/testdata/data/chars-formats.json b/testdata/data/chars-formats.json
new file mode 100644
index 000000000..8832bacca
--- /dev/null
+++ b/testdata/data/chars-formats.json
@@ -0,0 +1,3 @@
+{"cs":"abcde","cl":"88db79c70974e02deb3f01cfdcc5daae2078f21517d1021994f12685c0144addae3ce0dbd6a540b55b88af68486251fa6f0c8f9f94b3b1b4bc64c69714e281f388db79c70974","vc":"variable length"}
+{"cs":"abc ","cl":"8d3fffddf79e9a232ffd19f9ccaa4d6b37a6a243dbe0f23137b108a043d9da13121a9b505c804956b22e93c7f93969f4a7ba8ddea45bf4aab0bebc8f814e09918d3fffddf79e","vc":"abc"}
+{"cs":"abcdef","cl":"68f8c4575da360c32abb46689e58193a0eeaa905ae6f4a5e6c702a6ae1db35a6f86f8222b7a5489d96eb0466c755b677a64160d074617096a8c6279038bc720468f8c4575da3","vc":"b2fe9d4638503a57f93396098f24103a20588631727d0f0b5016715a3f6f2616628f09b1f63b23e484396edf949d9a1c307dbe11f23b971afd75b0f639d8a3f1"}
\ No newline at end of file
diff --git a/testdata/data/json_test/complex.json b/testdata/data/json_test/complex.json
new file mode 100644
index 000000000..461e0db47
--- /dev/null
+++ b/testdata/data/json_test/complex.json
@@ -0,0 +1,5 @@
+{"id":1,"name":"Alice","spouse":null,"child":[]}
+{"id":2,"name":"Bob","spouse":{"id":null},"child":[{"id":3,"name":"Charlie"},{"id":4,"name":"David"}]}
+{"id":5,"name":"Emily","spouse":{"spouse":"Emily","id":null},"child":[{"id":6,"name":"Frank","spouse":{"id":7},"child":[{"id":8,"name":"Grace"},{"id":9,"name":"Henry"}]},{"id":10,"name":"Isabel","spouse":{},"child":[{"id":11,"name":"Jack"},{"id":12,"name":"Kate"}]}]}
+{"id":13,"name":"Liam","spouse":{"id":14,"name":"Mia"},"child":[]}
+{"id":15,"name":"Nora","spouse":{"id":16,"name":"Oliver"},"child":[{"id":17,"name":"Peter"},{"id":18,"name":"Quinn"},{"id":19,"name":"Rose"}]}
\ No newline at end of file
diff --git a/testdata/data/json_test/malformed.json b/testdata/data/json_test/malformed.json
new file mode 100644
index 000000000..688387c05
--- /dev/null
+++ b/testdata/data/json_test/malformed.json
@@ -0,0 +1,18 @@
+{"bool_col":true,"int_col":0,"float_col":"abc","string_col":"abc123"}
+{"bool_col":False,"int_col":1,"float_col":0.1,"string_col":abc123}
+{"bool_col":true,"int_col":2,:0.2,"string_col":"abc123"}
+{"bool_col":false,"int_col":3,"float_col":0.3,"string_col""abc123"}
+{"bool_col":true,"int_col":4,"float_col":0.4"string_col":"abc123"}
+{"bool_col":false,"int_col":5,"float_col":0.6e10000,"string_col":"abc123"}
+{"bool_col":true,"int_col":6,"float_col":0.,"string_col":"abc123"}
+{"bool_col":false,"int_col":7,"float_col":0.7e,"string_col":"abc123"}
+{"bool_col":true,"int_col":8,"float_col":0.8,"string_col":"abc123"
+{"float_col":0.9,"int_col":9,"string_col":"abc123","bool_col":false}
+{"bool_col":true,"int_col":-1,"float_col":-1,"string_col":"abc123","int_col":10,"float_col":1.0}
+123.456
+"abc123"
+{ }
+[ ]
+( )
+{"string_col":"abc123"}
+["string_col", "abc123"]
\ No newline at end of file
diff --git a/testdata/data/json_test/multiline.json b/testdata/data/json_test/multiline.json
new file mode 100644
index 000000000..883997c96
--- /dev/null
+++ b/testdata/data/json_test/multiline.json
@@ -0,0 +1,17 @@
+{"Id": 1, "Key": "normal object", "Value": "abcdefg"}
+{"Id": 2, "Key": "multiline string", "Value": "abcd
+efg"}
+{"Id": 3, "Key": "multiline number", "Value": 1234
+567}
+{"Id": 4, "Key": "multiline object1",
+  "Value": "abcdefg"}
+{"Id": 5, "Key": "multiline object2", "Value"
+  :"abcdefg"}
+{"Id": 6, "Key": "multiline object3", "Value":
+  "abcdefg"}
+{
+  "Id": 7,
+  "Key": "multiline object4",
+  "Value": "abcdefg"
+}
+{"Id": 8, "Key": "one line multiple objects", "Value": "obj1"}{"Id": 9, "Key": "one line multiple objects", "Value": "obj2"}
\ No newline at end of file
diff --git a/testdata/data/json_test/overflow.json b/testdata/data/json_test/overflow.json
new file mode 100644
index 000000000..6b1dc9789
--- /dev/null
+++ b/testdata/data/json_test/overflow.json
@@ -0,0 +1,6 @@
+{"tinyint_col":1,"smallint_col":2,"int_col":3,"bigint_col":4,"float_col":5.5,"double_col":6.6,"decimal0_col":123456789.1234,"decimal1_col":99999999999999999999999999999999999999,"decimal2_col":0.00000000000000000000000000000000000001}
+{"tinyint_col":1000,"smallint_col":100000,"int_col":10000000000000000,"bigint_col":10000000000000000000,"float_col":1e1000000,"double_col":1e10000,"decimal0_col":1234567890.1234,"decimal1_col":100000000000000000000000000000000000000,"decimal2_col":0.000000000000000000000000000000000000009}
+{"tinyint_col":-1000,"smallint_col":-100000,"int_col":-10000000000000000,"bigint_col":-10000000000000000000,"float_col":-1e1000000,"double_col":-1e10000,"decimal0_col":-123456789.12341,"decimal1_col":-100000000000000000000000000000000000000,"decimal2_col":-0.000000000000000000000000000000000000009}
+{"tinyint_col":"1","smallint_col":"2","int_col":"3","bigint_col":"4","float_col":"5.5","double_col":"6.6","decimal0_col":"123456789.1234","decimal1_col":"99999999999999999999999999999999999999","decimal2_col":"0.00000000000000000000000000000000000001"}
+{"tinyint_col":"1000","smallint_col":"100000","int_col":"10000000000000000","bigint_col":"10000000000000000000","float_col":"1e1000000","double_col":"1e10000","decimal0_col":"1234567890.1234","decimal1_col":"100000000000000000000000000000000000000","decimal2_col":"0.000000000000000000000000000000000000009"}
+{"tinyint_col":"-1000","smallint_col":"-100000","int_col":"-10000000000000000","bigint_col":"-10000000000000000000","float_col":"-1e1000000","double_col":"-1e10000","decimal0_col":"-123456789.12341","decimal1_col":"-100000000000000000000000000000000000000","decimal2_col":"-0.000000000000000000000000000000000000009"}
\ No newline at end of file
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 7befa6eb2..387618008 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -1390,6 +1390,66 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/data/overflow.txt' OVERWRITE INTO
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+complex_json
+---- COLUMNS
+id int
+name string
+spouse string
+child string
+---- ROW_FORMAT
+delimited fields terminated by ','  escaped by '\\'
+---- LOAD
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/data/json_test/complex.json' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+multiline_json
+---- COLUMNS
+id int
+key string
+value string
+---- ROW_FORMAT
+delimited fields terminated by ','  escaped by '\\'
+---- LOAD
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/data/json_test/multiline.json' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+malformed_json
+---- COLUMNS
+bool_col boolean
+int_col int
+float_col float
+string_col string
+---- ROW_FORMAT
+delimited fields terminated by ','  escaped by '\\'
+---- LOAD
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/data/json_test/malformed.json' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+overflow_json
+---- COLUMNS
+tinyint_col tinyint
+smallint_col smallint
+int_col int
+bigint_col bigint
+float_col float
+double_col double
+decimal0_col DECIMAL(13,4)
+decimal1_col DECIMAL(38,0)
+decimal2_col DECIMAL(38,38)
+---- ROW_FORMAT
+delimited fields terminated by ','  escaped by '\\'
+---- LOAD
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/data/json_test/overflow.json' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 widerow
 ---- COLUMNS
 string_col string
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 3967867bf..2418404fc 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -188,6 +188,14 @@ table_name:widerow, constraint:exclude, table_format:hbase/none/none
 # with no corresponding LOAD statement.
 table_name:nullformat_custom, constraint:exclude, table_format:hbase/none/none
 
+# complex_json has some complex json object, multiline_json has some multiline json object,
+# malformed_json has some malformed json object, overflow_json has some overflowing numerical value,
+# these tables are used in json scan test.
+table_name:complex_json, constraint:restrict_to, table_format:json/none/none
+table_name:multiline_json, constraint:restrict_to, table_format:json/none/none
+table_name:malformed_json, constraint:restrict_to, table_format:json/none/none
+table_name:overflow_json, constraint:restrict_to, table_format:json/none/none
+
 # Decimal can only be tested on formats Impala can write to (text and parquet).
 # TODO: add Avro once Hive or Impala can write Avro decimals
 table_name:decimal_tbl, constraint:restrict_to, table_format:text/none/none
@@ -198,16 +206,20 @@ table_name:decimal_tbl, constraint:restrict_to, table_format:kudu/none/none
 table_name:decimal_tiny, constraint:restrict_to, table_format:kudu/none/none
 table_name:decimal_tbl, constraint:restrict_to, table_format:orc/def/block
 table_name:decimal_tiny, constraint:restrict_to, table_format:orc/def/block
+table_name:decimal_tbl, constraint:restrict_to, table_format:json/none/none
+table_name:decimal_tiny, constraint:restrict_to, table_format:json/none/none
 
 table_name:decimal_rtf_tbl, constraint:restrict_to, table_format:text/none/none
 table_name:decimal_rtf_tbl, constraint:restrict_to, table_format:parquet/none/none
 table_name:decimal_rtf_tbl, constraint:restrict_to, table_format:kudu/none/none
 table_name:decimal_rtf_tbl, constraint:restrict_to, table_format:orc/def/block
+table_name:decimal_rtf_tbl, constraint:restrict_to, table_format:json/none/none
 
 table_name:decimal_rtf_tiny_tbl, constraint:restrict_to, table_format:text/none/none
 table_name:decimal_rtf_tiny_tbl, constraint:restrict_to, table_format:parquet/none/none
 table_name:decimal_rtf_tiny_tbl, constraint:restrict_to, table_format:kudu/none/none
 table_name:decimal_rtf_tiny_tbl, constraint:restrict_to, table_format:orc/def/block
+table_name:decimal_rtf_tiny_tbl, constraint:restrict_to, table_format:json/none/none
 
 table_name:avro_decimal_tbl, constraint:restrict_to, table_format:avro/snap/block
 
@@ -287,6 +299,7 @@ table_name:table_with_header_insert, constraint:restrict_to, table_format:parque
 # IMPALA-7368/IMPALA-7370/IMPALA-8198 adds DATE support for text, hbase, parquet and avro.
 # IMPALA-8801 adds DATE support for ORC.
 # IMPALA-8800 adds DATE support for Kudu.
+# IMPALA-10798 adds DATE support for JSON.
 # Other file-formats will be introduced later.
 table_name:date_tbl, constraint:restrict_to, table_format:parquet/none/none
 table_name:date_tbl, constraint:restrict_to, table_format:avro/snap/block
@@ -298,6 +311,7 @@ table_name:date_tbl, constraint:restrict_to, table_format:text/bzip/block
 table_name:date_tbl, constraint:restrict_to, table_format:text/gzip/block
 table_name:date_tbl, constraint:restrict_to, table_format:text/snap/block
 table_name:date_tbl, constraint:restrict_to, table_format:text/def/block
+table_name:date_tbl, constraint:restrict_to, table_format:json/none/none
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/none/none
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/bzip/block
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/gzip/block
diff --git a/testdata/workloads/functional-query/functional-query_core.csv b/testdata/workloads/functional-query/functional-query_core.csv
index 7118e3f35..ac4b0f7c6 100644
--- a/testdata/workloads/functional-query/functional-query_core.csv
+++ b/testdata/workloads/functional-query/functional-query_core.csv
@@ -7,3 +7,4 @@ file_format:parquet, dataset: functional, compression_codec: none, compression_t
 file_format:avro, dataset: functional, compression_codec: snap, compression_type: block
 file_format:hbase, dataset:functional, compression_codec:none, compression_type:none
 file_format:kudu, dataset:functional, compression_codec:none, compression_type:none
+file_format:json, dataset:functional, compression_codec:none, compression_type:none
diff --git a/testdata/workloads/functional-query/functional-query_dimensions.csv b/testdata/workloads/functional-query/functional-query_dimensions.csv
index ecb6a0e0c..d3e85fdc3 100644
--- a/testdata/workloads/functional-query/functional-query_dimensions.csv
+++ b/testdata/workloads/functional-query/functional-query_dimensions.csv
@@ -1,4 +1,4 @@
-file_format: text,seq,rc,avro,parquet,orc,hbase,kudu
+file_format: text,seq,rc,avro,parquet,orc,hbase,kudu,json
 dataset: functional
 compression_codec: none,def,gzip,bzip,snap
 compression_type: none,block,record
diff --git a/testdata/workloads/functional-query/functional-query_exhaustive.csv b/testdata/workloads/functional-query/functional-query_exhaustive.csv
index 148dd5ba3..1740b1a58 100644
--- a/testdata/workloads/functional-query/functional-query_exhaustive.csv
+++ b/testdata/workloads/functional-query/functional-query_exhaustive.csv
@@ -25,3 +25,4 @@ file_format: parquet, dataset: functional, compression_codec: none, compression_
 file_format: orc, dataset: functional, compression_codec: def, compression_type: block
 file_format: hbase, dataset: functional, compression_codec: none, compression_type: none
 file_format: kudu, dataset: functional, compression_codec: none, compression_type: none
+file_format: json, dataset: functional, compression_codec: none, compression_type: none
diff --git a/testdata/workloads/functional-query/functional-query_pairwise.csv b/testdata/workloads/functional-query/functional-query_pairwise.csv
index e046a0904..2d9b72423 100644
--- a/testdata/workloads/functional-query/functional-query_pairwise.csv
+++ b/testdata/workloads/functional-query/functional-query_pairwise.csv
@@ -7,3 +7,4 @@ file_format: parquet, dataset: functional, compression_codec: none, compression_
 file_format: orc, dataset: functional, compression_codec: def, compression_type: block
 file_format: hbase, dataset: functional, compression_codec: none, compression_type: none
 file_format: kudu, dataset: functional, compression_codec: none, compression_type: none
+file_format: json, dataset: functional, compression_codec: none, compression_type: none
diff --git a/testdata/workloads/functional-query/queries/DataErrorsTest/hdfs-json-scan-node-errors.test b/testdata/workloads/functional-query/queries/DataErrorsTest/hdfs-json-scan-node-errors.test
new file mode 100644
index 000000000..0d7066de5
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/DataErrorsTest/hdfs-json-scan-node-errors.test
@@ -0,0 +1,180 @@
+====
+---- QUERY
+select * from alltypeserror order by id
+---- ERRORS
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '0'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.bool_col, type: BOOLEAN, data: 'errtrue'
+Error converting column: functional_json.alltypeserror.tinyint_col, type: TINYINT, data: 'err9'
+Error converting column: functional_json.alltypeserror.smallint_col, type: SMALLINT, data: 'err9'
+Error converting column: functional_json.alltypeserror.int_col, type: INT, data: 'err9'
+Error converting column: functional_json.alltypeserror.bigint_col, type: BIGINT, data: 'err90'
+Error converting column: functional_json.alltypeserror.float_col, type: FLOAT, data: 'err9.000000'
+Error converting column: functional_json.alltypeserror.double_col, type: DOUBLE, data: 'err90.900000'
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '0000-01-01 00:00:00'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.double_col, type: DOUBLE, data: 'err70.700000'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.float_col, type: FLOAT, data: 'err6.000000'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.bigint_col, type: BIGINT, data: 'err50'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.int_col, type: INT, data: 'err4'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.smallint_col, type: SMALLINT, data: 'err3'
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '2002-14-10 00:00:00'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.tinyint_col, type: TINYINT, data: 'err2'
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '1999-10-10 90:10:10'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.bool_col, type: BOOLEAN, data: 'errfalse'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.float_col, type: FLOAT, data: 'xyz3.000000'
+Error converting column: functional_json.alltypeserror.double_col, type: DOUBLE, data: 'xyz30.300000'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.tinyint_col, type: TINYINT, data: 'xyz5'
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '0009-01-01 00:00:00'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '0'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.double_col, type: DOUBLE, data: 'xyz70.700000'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '2020-20-10 10:10:10.123'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.bool_col, type: BOOLEAN, data: 't\rue'
+Error converting column: functional_json.alltypeserror.tinyint_col, type: TINYINT, data: 'err30'
+Error converting column: functional_json.alltypeserror.smallint_col, type: SMALLINT, data: 'err30'
+Error converting column: functional_json.alltypeserror.int_col, type: INT, data: 'err30'
+Error converting column: functional_json.alltypeserror.bigint_col, type: BIGINT, data: 'err300'
+Error converting column: functional_json.alltypeserror.float_col, type: FLOAT, data: 'err30..000000'
+Error converting column: functional_json.alltypeserror.double_col, type: DOUBLE, data: 'err300.900000'
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '0000-01-01 00:00:00'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.int_col, type: INT, data: 'abc9'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.tinyint_col, type: TINYINT, data: 'abc7'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.int_col, type: INT, data: 'abc5'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '2020-10-10 10:70:10.123'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.smallint_col, type: SMALLINT, data: 'abc3'
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '2020-10-10 60:10:10.123'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserror.timestamp_col, type: TIMESTAMP, data: '2020-10-40 10:10:10.123'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+---- RESULTS
+0,NULL,NULL,0,0,0,0.0,0.0,'01/01/09','0',NULL,2009,1
+1,NULL,NULL,1,1,10,1.0,10.1,'01/01/09','1',1999-10-10 00:00:00,2009,1
+2,true,NULL,NULL,2,20,2.0,20.2,'01/01/09','2',NULL,2009,1
+3,false,3,NULL,NULL,30,3.0,30.3,'01/01/09','3',NULL,2009,1
+4,true,4,4,NULL,NULL,4.0,40.4,'01/01/09','4',1970-01-01 00:00:00,2009,1
+5,false,5,5,5,NULL,NULL,50.5,'01/01/09','5',1970-01-01 00:00:00,2009,1
+6,true,6,6,6,60,NULL,NULL,'01/01/09','6',1970-01-01 00:00:00,2009,1
+7,NULL,NULL,7,7,70,7.0,NULL,'01/01/09','7',1970-01-01 00:00:00,2009,1
+8,false,NULL,NULL,8,80,8.0,80.8,'01/01/09','8',1970-01-01 00:00:00,2009,1
+9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/09','9',NULL,2009,1
+10,NULL,NULL,NULL,0,0,0.0,0.0,'02/01/09','0',2009-01-01 00:00:00,2009,2
+11,false,NULL,NULL,NULL,10,1.0,10.1,'02/01/09','1',2009-01-01 00:00:00,2009,2
+12,true,2,NULL,NULL,NULL,2.0,20.2,'02/01/09','2',2009-01-01 00:00:00,2009,2
+13,false,3,3,NULL,NULL,NULL,NULL,'02/01/09','3',2009-01-01 00:00:00,2009,2
+14,true,4,4,4,40,NULL,NULL,'02/01/09','4',2009-01-01 00:00:00,2009,2
+15,false,NULL,5,5,50,5.0,50.5,'02/01/09','5',NULL,2009,2
+16,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'02/01/09','6',NULL,2009,2
+17,false,7,7,7,70,7.0,NULL,'02/01/09','7',2009-01-01 00:00:00,2009,2
+18,true,8,8,8,80,8.0,80.8,'02/01/09','8',2009-01-01 00:00:00,2009,2
+19,false,9,9,9,90,9.0,90.9,'02/01/09','9',2009-01-01 00:00:00,2009,2
+20,true,0,0,0,0,0.0,0.0,'03/01/09','0',2020-10-10 10:10:10.123000000,2009,3
+21,false,1,1,1,10,1.0,10.1,'03/01/09','1',NULL,2009,3
+22,true,2,2,2,20,2.0,20.2,'03/01/09','2',NULL,2009,3
+23,false,3,NULL,3,30,3.0,30.3,'03/01/09','3',NULL,2009,3
+24,true,4,4,4,40,4.0,40.4,'03/01/09','4',NULL,2009,3
+25,false,5,5,NULL,50,5.0,50.5,'03/01/09','5',2020-10-10 10:10:10.123000000,2009,3
+26,true,6,6,6,60,6.0,60.6,'03/01/09','6',2020-10-10 10:10:10.123000000,2009,3
+27,false,NULL,7,7,70,7.0,70.7,'03/01/09','7',2020-10-10 10:10:10.123000000,2009,3
+28,true,8,8,8,80,8.0,80.8,'03/01/09','8',2020-10-10 10:10:10.123000000,2009,3
+29,false,9,9,NULL,90,9.0,90.9,'03/01/09','9',2020-10-10 10:10:10.123000000,2009,3
+30,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','10',NULL,2009,3
+---- TYPES
+int, boolean, tinyint, smallint, int, bigint, float, double, string, string, timestamp, int, int
+====
+---- QUERY
+select * from alltypeserrornonulls order by id
+---- ERRORS
+Error converting column: functional_json.alltypeserrornonulls.timestamp_col, type: TIMESTAMP, data: '123456'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.bool_col, type: BOOLEAN, data: 'errfalse'
+Error converting column: functional_json.alltypeserrornonulls.timestamp_col, type: TIMESTAMP, data: '1990-00-01 10:10:10'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.tinyint_col, type: TINYINT, data: 'err2'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.smallint_col, type: SMALLINT, data: 'err3'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.int_col, type: INT, data: 'err4'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.bigint_col, type: BIGINT, data: 'err50'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.float_col, type: FLOAT, data: 'err6.000000'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.double_col, type: DOUBLE, data: 'err70.700000'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.bool_col, type: BOOLEAN, data: 'errtrue'
+Error converting column: functional_json.alltypeserrornonulls.tinyint_col, type: TINYINT, data: 'err9'
+Error converting column: functional_json.alltypeserrornonulls.smallint_col, type: SMALLINT, data: 'err9'
+Error converting column: functional_json.alltypeserrornonulls.int_col, type: INT, data: 'err9'
+Error converting column: functional_json.alltypeserrornonulls.bigint_col, type: BIGINT, data: 'err90'
+Error converting column: functional_json.alltypeserrornonulls.float_col, type: FLOAT, data: 'err9.000000'
+Error converting column: functional_json.alltypeserrornonulls.double_col, type: DOUBLE, data: 'err90.900000'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.float_col, type: FLOAT, data: 'xyz3.000000'
+Error converting column: functional_json.alltypeserrornonulls.double_col, type: DOUBLE, data: 'xyz30.300000'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.tinyint_col, type: TINYINT, data: 'xyz5'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.double_col, type: DOUBLE, data: 'xyz70.700000'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.smallint_col, type: SMALLINT, data: 'abc3'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.int_col, type: INT, data: 'abc5'
+Error converting column: functional_json.alltypeserrornonulls.timestamp_col, type: TIMESTAMP, data: '2012-Mar-22 11:20:01.123'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.tinyint_col, type: TINYINT, data: 'abc7'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.timestamp_col, type: TIMESTAMP, data: '11:20:01.123 2012-03-22 '
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+Error converting column: functional_json.alltypeserrornonulls.int_col, type: INT, data: 'abc9'
+row_regex: .*Error parsing row: file: $NAMENODE/.* before offset: \d+
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',NULL,2009,1
+1,NULL,1,1,1,10,1,10.1,'01/01/09','1',NULL,2009,1
+2,true,NULL,2,2,20,2,20.2,'01/01/09','2',2012-03-22 11:20:01.123000000,2009,1
+3,false,3,NULL,3,30,3,30.3,'01/01/09','3',2012-03-22 11:20:01.123000000,2009,1
+4,true,4,4,NULL,40,4,40.4,'01/01/09','4',2012-03-22 11:20:01.123000000,2009,1
+5,false,5,5,5,NULL,5,50.5,'01/01/09','5',2012-03-22 11:20:01.123000000,2009,1
+6,true,6,6,6,60,NULL,60.6,'01/01/09','6',2012-03-22 11:20:01.123000000,2009,1
+7,false,7,7,7,70,7,NULL,'01/01/09','7',2012-03-22 11:20:01.123000000,2009,1
+8,false,8,8,8,80,8,80.8,'01/01/09','8',2012-03-22 11:20:01.123000000,2009,1
+9,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/09','9',2012-03-22 11:20:01.123000000,2009,1
+10,true,0,0,0,0,0,0,'02/01/09','0',2012-03-22 11:20:01.123000000,2009,2
+11,false,1,1,1,10,1,10.1,'02/01/09','1',2012-03-22 11:20:01.123000000,2009,2
+12,true,2,2,2,20,2,20.2,'02/01/09','2',2012-03-22 11:20:01.123000000,2009,2
+13,false,3,3,3,30,NULL,NULL,'02/01/09','3',2012-03-22 11:20:01.123000000,2009,2
+14,true,4,4,4,40,4,40.4,'02/01/09','4',2012-03-22 11:20:01.123000000,2009,2
+15,false,NULL,5,5,50,5,50.5,'02/01/09','5',2012-03-22 11:20:01.123000000,2009,2
+16,true,6,6,6,60,6,60.6,'02/01/09','6',2012-03-22 11:20:01.123000000,2009,2
+17,false,7,7,7,70,7,NULL,'02/01/09','7',2012-03-22 11:20:01.123000000,2009,2
+18,true,8,8,8,80,8,80.8,'02/01/09','8',2012-03-22 11:20:01.123000000,2009,2
+19,false,9,9,9,90,9,90.90000000000001,'02/01/09','9',2012-03-22 11:20:01.123000000,2009,2
+20,true,0,0,0,0,0,0,'03/01/09','0',2012-03-22 11:20:01.123000000,2009,3
+21,false,1,1,1,10,1,10.1,'03/01/09','1',2012-03-22 11:20:01.123000000,2009,3
+22,true,2,2,2,20,2,20.2,'03/01/09','2',2012-03-22 11:20:01.123000000,2009,3
+23,false,3,NULL,3,30,3,30.3,'03/01/09','3',2012-03-22 11:20:01.123000000,2009,3
+24,true,4,4,4,40,4,40.4,'03/01/09','4',2012-03-22 11:20:01.123000000,2009,3
+25,false,5,5,NULL,50,5,50.5,'03/01/09','5',NULL,2009,3
+26,true,6,6,6,60,6,60.6,'03/01/09','6',2012-03-22 11:20:01.123000000,2009,3
+27,false,NULL,7,7,70,7,70.7,'03/01/09','7',2012-03-22 11:20:01.123000000,2009,3
+28,true,8,8,8,80,8,80.8,'03/01/09','8',NULL,2009,3
+29,false,9,9,NULL,90,9,90.90000000000001,'03/01/09','9',2012-03-22 00:00:00,2009,3
+---- TYPES
+int, boolean, tinyint, smallint, int, bigint, float, double, string, string, timestamp, int, int
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/complex_json.test b/testdata/workloads/functional-query/queries/QueryTest/complex_json.test
new file mode 100644
index 000000000..1a1ddbc3d
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/complex_json.test
@@ -0,0 +1,14 @@
+====
+---- QUERY
+# Testing scanning of complex JSON. JsonParser and HdfsJsonScanner do not support complex
+# types yet, so they will be set as null for now.
+select id, name, spouse, child from complex_json
+---- TYPES
+int, string, string, string
+---- RESULTS
+1,'Alice','NULL','NULL'
+2,'Bob','NULL','NULL'
+5,'Emily','NULL','NULL'
+13,'Liam','NULL','NULL'
+15,'Nora','NULL','NULL'
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/disable-json-scanner.test b/testdata/workloads/functional-query/queries/QueryTest/disable-json-scanner.test
new file mode 100644
index 000000000..3eab0a8e9
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/disable-json-scanner.test
@@ -0,0 +1,7 @@
+====
+---- QUERY
+# Test that running with JSON scanner disabled fails gracefully.
+select * from functional_json.alltypes
+---- CATCH
+JSON scans are disabled by --enable_json_scanner flag.
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/malformed_json.test b/testdata/workloads/functional-query/queries/QueryTest/malformed_json.test
new file mode 100644
index 000000000..155d6f0ac
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/malformed_json.test
@@ -0,0 +1,25 @@
+====
+---- QUERY
+# Testing scanning of malformed JSON, if there is a data conversion failure, the scanner
+# will report the error and use a NULL value to fill the field. If the JSON format itself
+# is incorrect, rapidjson will stop parsing and report the corresponding error, because
+# parsing errors are difficult to recover from, we just use NULL to fill the remaining
+# fields, jump to next line and continue normal parsing.
+select bool_col, int_col, float_col, string_col from malformed_json
+---- TYPES
+boolean, int, float, string
+---- RESULTS
+true,0,NULL,'abc123'
+NULL,NULL,NULL,'NULL'
+true,2,NULL,'NULL'
+false,3,0.300000011921,'NULL'
+true,4,0.40000000596,'NULL'
+false,5,NULL,'NULL'
+true,6,NULL,'NULL'
+false,7,NULL,'NULL'
+true,8,0.800000011921,'abc123'
+false,9,0.899999976158,'abc123'
+true,10,1.0,'abc123'
+NULL,NULL,NULL,'NULL'
+NULL,NULL,NULL,'abc123'
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/multiline_json.test b/testdata/workloads/functional-query/queries/QueryTest/multiline_json.test
new file mode 100644
index 000000000..4368dd8db
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/multiline_json.test
@@ -0,0 +1,27 @@
+====
+---- QUERY
+# Testing scanning of multi-line JSON, rapidjson can handle cases where line breaks appear
+# in JSON (except for those that appear in numbers and strings), so in most cases it can
+# scan multi-line JSON. However, line breaks in strings and numbers are treated as invalid
+# values, and the scanner returns null. Additionally, it should be noted that if the line
+# break in the multi-line JSON is near the beginning of the scan range, it may cause the
+# parser to misjudge the starting position of the first complete JSON (because it always
+# starts parsing from the position after the first line break). This usually has no
+# effect (except report a error), but if there happens to be a sub-object immediately
+# after the line break, itwill cause an extra line of data to be scanned. If the line
+# break in the multi-line JSONis also at the beginning of the scan range, it will cause
+# the last line of data from the previous scan range to be incomplete.
+select id, key, value from multiline_json
+---- TYPES
+int, string, string
+---- RESULTS
+1,'normal object','abcdefg'
+2,'multiline string','NULL'
+3,'multiline number','1234'
+4,'multiline object1','abcdefg'
+5,'multiline object2','abcdefg'
+6,'multiline object3','abcdefg'
+7,'multiline object4','abcdefg'
+8,'one line multiple objects','obj1'
+9,'one line multiple objects','obj2'
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/overflow_json.test b/testdata/workloads/functional-query/queries/QueryTest/overflow_json.test
new file mode 100644
index 000000000..d4afe584f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/overflow_json.test
@@ -0,0 +1,20 @@
+====
+---- QUERY
+# Tests overflow of numeric in JSON. In JSON, numeric can be stored as either number or
+# string types. Due to limitations in RapidJSON, numeric values that overflow will result
+# in a parsing error (kParseErrorNumberTooBig), even when using the
+# kParseNumbersAsStringsFlag. This parsing error is difficult to recover from, so we can
+# only fill it with NULL and report the error. However, for numbers stored as strings,
+# there is no such limitation, and they can be passed to the TextConverter for further
+# processing.
+select tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col from overflow_json
+---- TYPES
+tinyint, smallint, int, bigint, float, double
+---- RESULTS
+1,2,3,4,5.5,6.6
+127,32767,2147483647,9223372036854775807,NULL,NULL
+-128,-32768,-2147483648,-9223372036854775808,NULL,NULL
+1,2,3,4,5.5,6.6
+127,32767,2147483647,9223372036854775807,Infinity,Infinity
+-128,-32768,-2147483648,-9223372036854775808,-Infinity,-Infinity
+====
diff --git a/testdata/workloads/tpcds/tpcds_core.csv b/testdata/workloads/tpcds/tpcds_core.csv
index 48cc97dae..615e6677a 100644
--- a/testdata/workloads/tpcds/tpcds_core.csv
+++ b/testdata/workloads/tpcds/tpcds_core.csv
@@ -3,3 +3,4 @@ file_format: text, dataset: tpcds, compression_codec: none, compression_type: no
 file_format: seq, dataset: tpcds, compression_codec: snap, compression_type: block
 file_format: parquet, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpcds, compression_codec: def, compression_type: block
+file_format: json, dataset: tpcds, compression_codec: none, compression_type: none
diff --git a/testdata/workloads/tpcds/tpcds_exhaustive.csv b/testdata/workloads/tpcds/tpcds_exhaustive.csv
index 6cb3b9ba6..0ae2e5bac 100644
--- a/testdata/workloads/tpcds/tpcds_exhaustive.csv
+++ b/testdata/workloads/tpcds/tpcds_exhaustive.csv
@@ -23,3 +23,4 @@ file_format: parquet, dataset: tpcds, compression_codec: snap, compression_type:
 file_format: orc, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpcds, compression_codec: def, compression_type: block
 file_format: orc, dataset: tpcds, compression_codec: snap, compression_type: block
+file_format: json, dataset: tpcds, compression_codec: none, compression_type: none
diff --git a/testdata/workloads/tpcds/tpcds_pairwise.csv b/testdata/workloads/tpcds/tpcds_pairwise.csv
index 7d4515d5e..d9045443d 100644
--- a/testdata/workloads/tpcds/tpcds_pairwise.csv
+++ b/testdata/workloads/tpcds/tpcds_pairwise.csv
@@ -15,3 +15,4 @@ file_format: rc, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpcds, compression_codec: def, compression_type: block
 file_format: orc, dataset: tpcds, compression_codec: snap, compression_type: block
+file_format: json, dataset: tpcds, compression_codec: none, compression_type: none
diff --git a/testdata/workloads/tpch/tpch_core.csv b/testdata/workloads/tpch/tpch_core.csv
index 024063cfe..4d7193ad4 100644
--- a/testdata/workloads/tpch/tpch_core.csv
+++ b/testdata/workloads/tpch/tpch_core.csv
@@ -9,3 +9,4 @@ file_format:avro, dataset:tpch, compression_codec: snap, compression_type: block
 file_format:parquet, dataset:tpch, compression_codec: none, compression_type: none
 file_format:orc, dataset:tpch, compression_codec: def, compression_type: block
 file_format:kudu, dataset:tpch, compression_codec: none, compression_type: none
+file_format:json, dataset:tpch, compression_codec:none, compression_type:none
diff --git a/testdata/workloads/tpch/tpch_dimensions.csv b/testdata/workloads/tpch/tpch_dimensions.csv
index 57e0dd49d..34b81aaa3 100644
--- a/testdata/workloads/tpch/tpch_dimensions.csv
+++ b/testdata/workloads/tpch/tpch_dimensions.csv
@@ -1,4 +1,4 @@
-file_format: text,seq,rc,avro,parquet,orc,kudu
+file_format: text,seq,rc,avro,parquet,orc,kudu,json
 dataset: tpch
 compression_codec: none,def,gzip,bzip,snap
 compression_type: none,block,record
diff --git a/testdata/workloads/tpch/tpch_exhaustive.csv b/testdata/workloads/tpch/tpch_exhaustive.csv
index fffaa9259..fc1821a3b 100644
--- a/testdata/workloads/tpch/tpch_exhaustive.csv
+++ b/testdata/workloads/tpch/tpch_exhaustive.csv
@@ -25,3 +25,4 @@ file_format: orc, dataset: tpch, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpch, compression_codec: def, compression_type: block
 file_format: orc, dataset: tpch, compression_codec: snap, compression_type: block
 file_format: kudu, dataset:tpch, compression_codec: none, compression_type: none
+file_format: json, dataset: tpch, compression_codec: none, compression_type: none
diff --git a/testdata/workloads/tpch/tpch_pairwise.csv b/testdata/workloads/tpch/tpch_pairwise.csv
index e245e3b4c..62550f7e7 100644
--- a/testdata/workloads/tpch/tpch_pairwise.csv
+++ b/testdata/workloads/tpch/tpch_pairwise.csv
@@ -16,3 +16,4 @@ file_format: orc, dataset: tpch, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpch, compression_codec: def, compression_type: block
 file_format: orc, dataset: tpch, compression_codec: snap, compression_type: block
 file_format: kudu, dataset:tpch, compression_codec: none, compression_type: none
+file_format: json, dataset: tpch, compression_codec: none, compression_type: none
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 3f9bf23fa..0c5c2fef3 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -41,7 +41,8 @@ FILE_FORMAT_TO_STORED_AS_MAP = {
   'avro': 'AVRO',
   'hbase': "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'",
   'kudu': "KUDU",
-  'iceberg': "ICEBERG"
+  'iceberg': "ICEBERG",
+  'json': "JSONFILE",
 }
 
 # Describes the configuration used to execute a single tests. Contains both the details
@@ -49,7 +50,7 @@ FILE_FORMAT_TO_STORED_AS_MAP = {
 # to use when running the query.
 class TableFormatInfo(object):
   KNOWN_FILE_FORMATS = ['text', 'seq', 'rc', 'parquet', 'orc', 'avro', 'hbase',
-                        'kudu', 'iceberg']
+                        'kudu', 'iceberg', 'json']
   KNOWN_COMPRESSION_CODECS = ['none', 'snap', 'gzip', 'bzip', 'def', 'zstd', 'lz4']
   KNOWN_COMPRESSION_TYPES = ['none', 'block', 'record']
 
@@ -121,6 +122,12 @@ def create_uncompressed_text_dimension(workload):
       TableFormatInfo.create_from_string(dataset, 'text/none'))
 
 
+def create_uncompressed_json_dimension(workload):
+  dataset = get_dataset_from_workload(workload)
+  return ImpalaTestDimension('table_format',
+      TableFormatInfo.create_from_string(dataset, 'json/none'))
+
+
 def create_parquet_dimension(workload):
   dataset = get_dataset_from_workload(workload)
   return ImpalaTestDimension('table_format',
diff --git a/tests/custom_cluster/test_disable_features.py b/tests/custom_cluster/test_disable_features.py
index 83d0d4abb..3d01dba26 100644
--- a/tests/custom_cluster/test_disable_features.py
+++ b/tests/custom_cluster/test_disable_features.py
@@ -59,3 +59,8 @@ class TestDisableFeatures(CustomClusterTestSuite):
     self.execute_query_expect_failure(
         self.client,
         "select sum(id) over(order by id) from functional.alltypes having -1")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--enable_json_scanner=false")
+  def test_disable_json_scanner(self, vector):
+    self.run_test_case('QueryTest/disable-json-scanner', vector)
diff --git a/tests/data_errors/test_data_errors.py b/tests/data_errors/test_data_errors.py
index b0139e7ec..147cf5380 100644
--- a/tests/data_errors/test_data_errors.py
+++ b/tests/data_errors/test_data_errors.py
@@ -154,6 +154,19 @@ class TestHdfsRcFileScanNodeErrors(TestHdfsScanNodeErrors):
     self.run_test_case('DataErrorsTest/hdfs-rcfile-scan-node-errors', vector)
 
 
+@SkipIfFS.qualified_path
+class TestHdfsJsonScanNodeErrors(TestHdfsScanNodeErrors):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestHdfsJsonScanNodeErrors, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'json')
+
+  def test_hdfs_json_scan_node_errors(self, vector):
+    vector.get_value('exec_option')['abort_on_error'] = 0
+    self.run_test_case('DataErrorsTest/hdfs-json-scan-node-errors', vector)
+
+
 class TestAvroErrors(TestDataErrors):
   @classmethod
   def add_test_dimensions(cls):
diff --git a/tests/metadata/test_hms_integration.py b/tests/metadata/test_hms_integration.py
index d1cbd4cc7..42b02884a 100644
--- a/tests/metadata/test_hms_integration.py
+++ b/tests/metadata/test_hms_integration.py
@@ -121,29 +121,6 @@ class TestHmsIntegrationSanity(ImpalaTestSuite):
     self.client.execute("DESCRIBE {0}.json_tbl"
                         .format(unique_database))
 
-  def test_json_file_unsupported(self, unique_database):
-    """
-    Since JSON file format is not yet supported,this function tests
-    the blocking logic of reading JSON tables.
-    """
-    self.client.execute("create table {0}.json_tbl(id int, name string, age int)"
-                        " stored as jsonfile".format(unique_database))
-    self.run_stmt_in_hive("insert  into {0}.json_tbl values(0,'Alice',10)"
-                          .format(unique_database))
-    self.run_stmt_in_hive("insert  into {0}.json_tbl values(1,'Bob',20)"
-                          .format(unique_database))
-    self.run_stmt_in_hive("insert  into {0}.json_tbl values(2,'Oracle',16)"
-                          .format(unique_database))
-    self.client.execute("refresh {0}.json_tbl".format(unique_database))
-    self.client.execute("show files in {0}.json_tbl".format(unique_database))
-    try:
-      self.client.execute("select * from {0}.json_tbl".format(unique_database))
-    except Exception as e:
-      assert 'Scan of table {0}.json_tbl in format \'JSON\' is not supported.'\
-        .format(unique_database) in str(e)
-    else:
-      assert False
-
   def test_invalidate_metadata(self, unique_name):
     """Verify invalidate metadata on tables under unloaded db won't fail"""
     db = unique_name + "_db"
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index 63c43e383..5a5f504ce 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -110,8 +110,8 @@ class TestCancellation(ImpalaTestSuite):
 
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('query_type') != 'CTAS' or (\
-            v.get_value('table_format').file_format in ['text', 'parquet', 'kudu'] and\
-            v.get_value('table_format').compression_codec == 'none'))
+            v.get_value('table_format').file_format in ['text', 'parquet', 'kudu', 'json']
+            and v.get_value('table_format').compression_codec == 'none'))
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('exec_option')['batch_size'] == 0)
     # Ignore 'compute stats' queries for the CTAS query type.
@@ -247,6 +247,9 @@ class TestCancellationSerial(TestCancellation):
 
   @pytest.mark.execute_serially
   def test_cancel_insert(self, vector):
+    if vector.get_value('table_format').file_format == 'json':
+      # Insert into json format is not supported yet.
+      pytest.skip()
     self.execute_cancel_test(vector)
     metric_verifier = MetricVerifier(self.impalad_test_service)
     metric_verifier.verify_no_open_files(timeout=10)
diff --git a/tests/query_test/test_chars.py b/tests/query_test/test_chars.py
index 6f2176600..25d06c9ac 100644
--- a/tests/query_test/test_chars.py
+++ b/tests/query_test/test_chars.py
@@ -35,9 +35,9 @@ class TestStringQueries(ImpalaTestSuite):
     super(TestStringQueries, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(
       create_exec_option_dimension(disable_codegen_options=[False, True]))
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format in ['text'] and
-        v.get_value('table_format').compression_codec in ['none'])
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format in ['text', 'json']
+        and v.get_value('table_format').compression_codec in ['none'])
     # Run these queries through both beeswax and HS2 to get coverage of CHAR/VARCHAR
     # returned via both protocols.
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
@@ -80,8 +80,8 @@ class TestCharFormats(ImpalaTestSuite):
         v.get_value('table_format').compression_codec in ['snap']) or
         v.get_value('table_format').file_format in ['parquet'] or
         v.get_value('table_format').file_format in ['orc'] or
-        (v.get_value('table_format').file_format in ['text'] and
-        v.get_value('table_format').compression_codec in ['none']))
+        (v.get_value('table_format').file_format in ['text', 'json']
+         and v.get_value('table_format').compression_codec in ['none']))
     # Run these queries through both beeswax and HS2 to get coverage of CHAR/VARCHAR
     # returned via both protocols.
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
diff --git a/tests/query_test/test_date_queries.py b/tests/query_test/test_date_queries.py
index ad51e4b03..c69392cd3 100644
--- a/tests/query_test/test_date_queries.py
+++ b/tests/query_test/test_date_queries.py
@@ -40,9 +40,10 @@ class TestDateQueries(ImpalaTestSuite):
         'batch_size': [0, 1],
         'disable_codegen': ['false', 'true'],
         'disable_codegen_rows_threshold': [0]}))
-    # DATE type is only supported for text, parquet and avro fileformat on HDFS and HBASE.
+    # DATE type is only supported for text, parquet, avro, orc and json fileformat on HDFS
+    # and HBASE.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format in ('text', 'hbase', 'parquet')
+        v.get_value('table_format').file_format in ('text', 'hbase', 'parquet', 'json')
         or (v.get_value('table_format').file_format == 'avro'
             and v.get_value('table_format').compression_codec == 'snap'))
 
diff --git a/tests/query_test/test_decimal_queries.py b/tests/query_test/test_decimal_queries.py
index 7ed102275..4f77f6faa 100644
--- a/tests/query_test/test_decimal_queries.py
+++ b/tests/query_test/test_decimal_queries.py
@@ -48,7 +48,7 @@ class TestDecimalQueries(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_constraint(lambda v:\
         (v.get_value('table_format').file_format == 'text' and
          v.get_value('table_format').compression_codec == 'none') or
-         v.get_value('table_format').file_format in ['parquet', 'orc', 'kudu'])
+         v.get_value('table_format').file_format in ['parquet', 'orc', 'kudu', 'json'])
 
     # Run these queries through both beeswax and HS2 to get coverage of decimals returned
     # via both protocols.
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index 8e7a1556c..1c7d9de67 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -26,9 +26,9 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import (
     SkipIfEC, SkipIfCatalogV2, SkipIfNotHdfsMinicluster, SkipIfFS)
 from tests.common.test_dimensions import (
-    create_uncompressed_text_dimension, create_exec_option_dimension_from_dict,
-    create_client_protocol_dimension, hs2_parquet_constraint,
-    extend_exec_option_dimension, FILE_FORMAT_TO_STORED_AS_MAP)
+    create_uncompressed_text_dimension, create_uncompressed_json_dimension,
+    create_exec_option_dimension_from_dict, create_client_protocol_dimension,
+    hs2_parquet_constraint, extend_exec_option_dimension, FILE_FORMAT_TO_STORED_AS_MAP)
 from tests.util.filesystem_utils import get_fs_path
 from subprocess import check_call
 
@@ -258,6 +258,36 @@ class TestQueriesTextTables(ImpalaTestSuite):
   def test_values(self, vector):
     self.run_test_case('QueryTest/values', vector)
 
+
+# Tests in this class are only run against json/none either because that's the only
+# format that is supported, or the tests don't exercise the file format.
+class TestQueriesJsonTables(ImpalaTestSuite):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestQueriesJsonTables, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_json_dimension(cls.get_workload()))
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  def test_complex(self, vector):
+    vector.get_value('exec_option')['abort_on_error'] = 0
+    self.run_test_case('QueryTest/complex_json', vector)
+
+  def test_multiline(self, vector):
+    vector.get_value('exec_option')['abort_on_error'] = 0
+    self.run_test_case('QueryTest/multiline_json', vector)
+
+  def test_malformed(self, vector):
+    vector.get_value('exec_option')['abort_on_error'] = 0
+    self.run_test_case('QueryTest/malformed_json', vector)
+
+  def test_overflow(self, vector):
+    vector.get_value('exec_option')['abort_on_error'] = 0
+    self.run_test_case('QueryTest/overflow_json', vector)
+
 # Tests in this class are only run against Parquet because the tests don't exercise the
 # file format.
 class TestQueriesParquetTables(ImpalaTestSuite):
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 1def9b521..c5e2dbb9e 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1930,6 +1930,8 @@ class TestBinaryType(ImpalaTestSuite):
       lambda v: v.get_value('table_format').file_format != 'kudu')
 
   def test_binary_type(self, vector):
+    if vector.get_value('table_format').file_format == 'json':
+      pytest.skip()
     self.run_test_case('QueryTest/binary-type', vector)
 
 
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index adea46bae..539d4ed87 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -77,8 +77,8 @@ class TestScannersFuzzing(ImpalaTestSuite):
     # TODO: enable for more table formats once they consistently pass the fuzz test.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format in ('avro', 'parquet', 'orc') or
-        (v.get_value('table_format').file_format == 'text' and
-          v.get_value('table_format').compression_codec in ('none')))
+        (v.get_value('table_format').file_format in ('text', 'json')
+          and v.get_value('table_format').compression_codec in ('none')))
 
 
   def test_fuzz_alltypes(self, vector, unique_database):
diff --git a/tests/query_test/test_tpch_queries.py b/tests/query_test/test_tpch_queries.py
index 201500a2d..4d2bec912 100644
--- a/tests/query_test/test_tpch_queries.py
+++ b/tests/query_test/test_tpch_queries.py
@@ -38,7 +38,8 @@ class TestTpchQuery(ImpalaTestSuite):
     # TODO: the planner tests are based on text and need this.
     if cls.exploration_strategy() == 'core':
       cls.ImpalaTestMatrix.add_constraint(lambda v:\
-          v.get_value('table_format').file_format in ['text', 'parquet', 'kudu', 'orc'])
+          v.get_value('table_format').file_format in ['text', 'parquet', 'kudu', 'orc',
+                                                      'json'])
 
   def idfn(val):
     return "TPC-H: Q{0}".format(val)