You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2024/01/18 08:35:18 UTC

(impala) branch master updated: IMPALA-12431: Support reading compressed JSON file

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new bb1ec12c1 IMPALA-12431: Support reading compressed JSON file
bb1ec12c1 is described below

commit bb1ec12c1f9deec25db6dde0a4506940d2970595
Author: Eyizoha <ey...@163.com>
AuthorDate: Wed Sep 13 16:55:47 2023 +0800

    IMPALA-12431: Support reading compressed JSON file
    
    This patch adds the functionality to read compressed JSON files for the
    JSON scanner. Because the decompression code can largely be reused from
    HdfsTextScanner, this patch moves that part of the code from
    HdfsTextScanner to HdfsScanner so that HdfsJsonScanner can also call it.
    As it reuses the relevant code from the TEXT scanner, the compression
    formats supported by the Json scanner are the same as those supported by
    the TEXT scanner.
    
    Tests
     - Most of the existing end-to-end JSON format tests can run on
       compressed JSON format too.
    
    Change-Id: I2471855d97d4cdd51363b321055e6b06aa6d81e8
    Reviewed-on: http://gerrit.cloudera.org:8080/20482
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-scanner.cc                        | 145 ++++++++++++++++++
 be/src/exec/hdfs-scanner.h                         |  20 +++
 be/src/exec/json/hdfs-json-scanner.cc              |  96 ++++++++----
 be/src/exec/json/hdfs-json-scanner.h               |  10 +-
 be/src/exec/text/hdfs-text-scanner.cc              | 162 +--------------------
 be/src/exec/text/hdfs-text-scanner.h               |  21 +--
 .../org/apache/impala/planner/HdfsScanNode.java    |   5 -
 .../functional-query_exhaustive.csv                |   4 +
 testdata/workloads/tpcds/tpcds_core.csv            |   2 +-
 testdata/workloads/tpcds/tpcds_exhaustive.csv      |   2 +-
 testdata/workloads/tpcds/tpcds_pairwise.csv        |   2 +-
 testdata/workloads/tpch/tpch_core.csv              |   2 +-
 testdata/workloads/tpch/tpch_exhaustive.csv        |   2 +-
 testdata/workloads/tpch/tpch_pairwise.csv          |   2 +-
 tests/query_test/test_compressed_formats.py        |   2 +-
 15 files changed, 259 insertions(+), 218 deletions(-)

diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index de3807bec..e17b8d183 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -24,6 +24,7 @@
 #include "exec/hdfs-scan-node.h"
 #include "exec/hdfs-scan-node-mt.h"
 #include "exec/read-write-util.h"
+#include "exec/scanner-context.inline.h"
 #include "exec/text-converter.h"
 #include "exec/text-converter.inline.h"
 #include "exprs/scalar-expr-evaluator.h"
@@ -790,6 +791,150 @@ Status HdfsScanner::UpdateDecompressor(const string& codec) {
   return Status::OK();
 }
 
+Status HdfsScanner::DecompressFileToBuffer(uint8_t** buffer, int64_t* bytes_read) {
+  // For other compressed file: attempt to read and decompress the entire file, point
+  // to the decompressed buffer, and then continue normal processing.
+  DCHECK(decompression_type_ != THdfsCompression::SNAPPY);
+  const HdfsFileDesc* desc = scan_node_->GetFileDesc(
+      context_->partition_descriptor()->id(), stream_->filename());
+  int64_t file_size = desc->file_length;
+  DCHECK_GT(file_size, 0);
+
+  Status status;
+  if (!stream_->GetBytes(file_size, buffer, bytes_read, &status)) {
+    DCHECK(!status.ok());
+    return status;
+  }
+
+  // If didn't read anything, return.
+  if (*bytes_read == 0) return Status::OK();
+
+  // Need to read the entire file.
+  if (file_size > *bytes_read) {
+    return Status(Substitute("Expected to read a compressed text file of size $0 bytes. "
+        "But only read $1 bytes. This may indicate data file corruption. (file: $3).",
+        file_size, *bytes_read, stream_->filename()));
+  }
+
+  // Decompress and adjust the buffer and bytes_read accordingly.
+  int64_t decompressed_len = 0;
+  uint8_t* decompressed_buffer = nullptr;
+  SCOPED_TIMER(decompress_timer_);
+  // TODO: Once the writers are in, add tests with very large compressed files (4GB)
+  // that could overflow.
+  RETURN_IF_ERROR(decompressor_->ProcessBlock(false, *bytes_read, *buffer,
+      &decompressed_len, &decompressed_buffer));
+
+  // Inform 'stream_' that the buffer with the compressed text can be released.
+  context_->ReleaseCompletedResources(true);
+
+  VLOG_FILE << "Decompressed " << *bytes_read << " to " << decompressed_len;
+  *buffer = decompressed_buffer;
+  *bytes_read = decompressed_len;
+  return Status::OK();
+}
+
+Status HdfsScanner::DecompressStreamToBuffer(uint8_t** buffer, int64_t* bytes_read,
+    MemPool* pool, bool* eosr) {
+  // We're about to create a new decompression buffer (if we can't reuse). Attach the
+  // memory from previous decompression rounds to 'pool'.
+  if (!decompressor_->reuse_output_buffer()) {
+    if (pool != nullptr) {
+      pool->AcquireData(data_buffer_pool_.get(), false);
+    } else {
+      data_buffer_pool_->FreeAll();
+    }
+  }
+
+  uint8_t* decompressed_buffer = nullptr;
+  int64_t decompressed_len = 0;
+  // Set bytes_to_read = -1 because we don't know how much data decompressor need.
+  // Just read the first available buffer within the scan range.
+  Status status = DecompressStream(-1, &decompressed_buffer, &decompressed_len,
+      eosr);
+  if (status.code() == TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS) {
+    // It's possible (but very unlikely) that ProcessBlockStreaming() wasn't able to
+    // make progress if the compressed buffer returned by GetBytes() is too small.
+    // (Note that this did not even occur in simple experiments where the input buffer
+    // is always 1 byte, but we need to handle this case to be defensive.) In this
+    // case, try again with a reasonably large fixed size buffer. If we still did not
+    // make progress, then return an error.
+    LOG(INFO) << status.GetDetail();
+    // Number of bytes to read when the previous attempt to streaming decompress did not
+    // make progress.
+    constexpr int64_t COMPRESSED_DATA_FIXED_READ_SIZE = 1 * 1024 * 1024;
+    status = DecompressStream(COMPRESSED_DATA_FIXED_READ_SIZE, &decompressed_buffer,
+        &decompressed_len, eosr);
+  }
+  RETURN_IF_ERROR(status);
+  *buffer = decompressed_buffer;
+  *bytes_read = decompressed_len;
+
+  if (*eosr) {
+    DCHECK(stream_->eosr());
+    context_->ReleaseCompletedResources(true);
+  }
+
+  return Status::OK();
+}
+
+Status HdfsScanner::DecompressStream(int64_t bytes_to_read,
+    uint8_t** decompressed_buffer, int64_t* decompressed_len, bool *eosr) {
+  // Some decompressors, such as Bzip2 API (version 0.9 and later) and Gzip can
+  // decompress buffers that are read from stream_, so we don't need to read the
+  // whole file in once. A compressed buffer is passed to ProcessBlockStreaming
+  // but it may not consume all of the input.
+  uint8_t* compressed_buffer_ptr = nullptr;
+  int64_t compressed_buffer_size = 0;
+  // We don't know how many bytes ProcessBlockStreaming() will consume so we set
+  // peek=true and then later advance the stream using SkipBytes().
+  if (bytes_to_read == -1) {
+    RETURN_IF_ERROR(stream_->GetBuffer(true, &compressed_buffer_ptr,
+        &compressed_buffer_size));
+  } else {
+    DCHECK_GT(bytes_to_read, 0);
+    Status status;
+    if (!stream_->GetBytes(bytes_to_read, &compressed_buffer_ptr, &compressed_buffer_size,
+        &status, true)) {
+      DCHECK(!status.ok());
+      return status;
+    }
+  }
+  int64_t compressed_buffer_bytes_read = 0;
+  bool stream_end = false;
+  {
+    SCOPED_TIMER(decompress_timer_);
+    Status status = decompressor_->ProcessBlockStreaming(compressed_buffer_size,
+        compressed_buffer_ptr, &compressed_buffer_bytes_read, decompressed_len,
+        decompressed_buffer, &stream_end);
+    if (!status.ok()) {
+      status.AddDetail(Substitute("$0file=$1, offset=$2", status.GetDetail(),
+          stream_->filename(), stream_->file_offset()));
+      return status;
+    }
+    DCHECK_GE(compressed_buffer_size, compressed_buffer_bytes_read);
+  }
+  // Skip the bytes in stream_ that were decompressed.
+  Status status;
+  if (!stream_->SkipBytes(compressed_buffer_bytes_read, &status)) {
+    DCHECK(!status.ok());
+    return status;
+  }
+
+  if (stream_->eosr()) {
+    if (stream_end) {
+      *eosr = true;
+    } else {
+      return Status(TErrorCode::COMPRESSED_FILE_TRUNCATED, stream_->filename());
+    }
+  } else if (*decompressed_len == 0) {
+    return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS,
+        stream_->filename());
+  }
+
+  return Status::OK();
+}
+
 bool HdfsScanner::ReportTupleParseError(FieldLocation* fields, uint8_t* errors) {
   for (int i = 0; i < scan_node_->materialized_slots().size(); ++i) {
     if (errors[i]) {
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 007a567a7..1d277a300 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -531,6 +531,26 @@ class HdfsScanner {
   Status UpdateDecompressor(const THdfsCompression::type& compression) WARN_UNUSED_RESULT;
   Status UpdateDecompressor(const std::string& codec) WARN_UNUSED_RESULT;
 
+  /// Fills bytes to buffer from the compressed data in 'stream_' by reading the entire
+  /// file, decompressing it, and setting the 'buffer' to the decompressed buffer.
+  Status DecompressFileToBuffer(uint8_t** buffer, int64_t* bytes_read) WARN_UNUSED_RESULT;
+
+  /// Fills bytes to buffer from the compressed data in 'stream_'. Unlike
+  /// DecompressFileToBuffer(), the entire file does not need to be read at once.
+  /// Buffers from 'stream_' are decompressed as they are read and 'buffer' is set to
+  /// available decompressed data.
+  /// Attaches decompression buffers from previous calls that might still be referenced
+  /// by returned batches to 'pool'. If 'pool' is nullptr the buffers are freed instead.
+  Status DecompressStreamToBuffer(uint8_t** buffer, int64_t* bytes_read, MemPool* pool,
+      bool* eosr) WARN_UNUSED_RESULT;
+
+  /// Used by DecompressStreamToBuffer() to decompress data from 'stream_'.
+  /// Returns COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS if it needs more input.
+  /// If bytes_to_read > 0, will read specified size.
+  /// If bytes_to_read = -1, will call GetBuffer().
+  Status DecompressStream(int64_t bytes_to_read, uint8_t** decompressed_buffer,
+      int64_t* decompressed_len, bool *eosr) WARN_UNUSED_RESULT;
+
   /// Utility function to report parse errors for each field.
   /// If errors[i] is nonzero, fields[i] had a parse error.
   /// Returns false if parsing should be aborted.  In this case parse_status_ is set
diff --git a/be/src/exec/json/hdfs-json-scanner.cc b/be/src/exec/json/hdfs-json-scanner.cc
index adc07f887..b0a5c5b16 100644
--- a/be/src/exec/json/hdfs-json-scanner.cc
+++ b/be/src/exec/json/hdfs-json-scanner.cc
@@ -68,6 +68,14 @@ Status HdfsJsonScanner::Open(ScannerContext* context) {
 
 void HdfsJsonScanner::Close(RowBatch* row_batch) {
   DCHECK(!is_closed_);
+  // Need to close the decompressor before transferring the remaining resources to
+  // 'row_batch' because in some cases there is memory allocated in the decompressor_'s
+  // temp_memory_pool_.
+  if (decompressor_ != nullptr) {
+    decompressor_->Close();
+    decompressor_.reset();
+  }
+
   if (row_batch != nullptr) {
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
     if (scan_node_->HasRowBatchQueue()) {
@@ -77,6 +85,10 @@ void HdfsJsonScanner::Close(RowBatch* row_batch) {
   } else {
     template_tuple_pool_->FreeAll();
   }
+  // The JsonParser always copies values instead of referencing them, so it doesn't
+  // reference the data in the data_buffer_pool_. Therefore, we don't need
+  // row_batch to acquire data from the data_buffer_pool_, so we could always
+  // call FreeAll().
   data_buffer_pool_->FreeAll();
   context_->ReleaseCompletedResources(true);
 
@@ -91,7 +103,21 @@ void HdfsJsonScanner::Close(RowBatch* row_batch) {
 Status HdfsJsonScanner::InitNewRange() {
   DCHECK_EQ(scanner_state_, CREATED);
 
-  // TODO: Optmize for empty projection.
+  auto compression_type = stream_ ->file_desc()->file_compression;
+  // Update the decompressor based on the compression type of the file in the context.
+  DCHECK(compression_type != THdfsCompression::SNAPPY)
+      << "FE should have generated SNAPPY_BLOCKED instead.";
+  // In Hadoop, text files compressed into .DEFLATE files contain
+  // deflate with zlib wrappings as opposed to raw deflate, which
+  // is what THdfsCompression::DEFLATE implies. Since deflate is
+  // the default compression algorithm used in Hadoop, it makes
+  // sense to map it to type DEFAULT in Impala instead
+  if (compression_type == THdfsCompression::DEFLATE) {
+    compression_type = THdfsCompression::DEFAULT;
+  }
+  RETURN_IF_ERROR(UpdateDecompressor(compression_type));
+
+  // TODO: Optmize for zero slots scan (e.g. count(*)).
   vector<string> schema;
   schema.reserve(scan_node_->materialized_slots().size());
   for (const SlotDescriptor* slot : scan_node_->materialized_slots()) {
@@ -231,9 +257,35 @@ static bool AllWhitespaceBeforeNewline(uint8_t* begin, int64_t len) {
   return false;
 }
 
+Status HdfsJsonScanner::FillBytesToBuffer(uint8_t** buffer, int64_t* bytes_read) {
+  if (scanner_state_ == PAST_SCANNING) {
+    // In the PAST_SCANNING state, we only read a small block data at a time for scanning.
+    // If the parser completes the parsing of the last json object, it will exit the loop
+    // due to BreakParse().
+    Status status;
+    if (UNLIKELY(!stream_->GetBytes(NEXT_BLOCK_READ_SIZE, buffer, bytes_read, &status))) {
+      DCHECK(!status.ok());
+      return status;
+    }
+
+    // A special case is when the first character of the next scan range is a newline
+    // character (perhaps with other whitespace characters before it). Our scan should
+    // stop at the first newline character in the next range, while the parser skips
+    // whitespace characters. If we don't handle this case, the first line of the next
+    // range will be scanned twice. Therefore, we need to return directly here to inform
+    // the parser that eos has been reached.
+    if (UNLIKELY(AllWhitespaceBeforeNewline(*buffer, *bytes_read))) {
+      scanner_state_ = FINISHED;
+      *bytes_read = 0;
+    }
+  } else {
+    RETURN_IF_ERROR(stream_->GetBuffer(false, buffer, bytes_read));
+  }
+  return Status::OK();
+}
+
 void HdfsJsonScanner::GetNextBuffer(const char** begin, const char** end) {
   DCHECK(*begin == *end);
-  DCHECK(decompressor_.get() == nullptr) << "Not support decompress json yet.";
   SCOPED_TIMER(get_buffer_timer_);
 
   // The eosr indicates that we have scanned all data within the scan range. If the
@@ -249,32 +301,24 @@ void HdfsJsonScanner::GetNextBuffer(const char** begin, const char** end) {
 
   if (stream_->eof() || scanner_state_ == FINISHED) return;
 
-  uint8_t* next_buffer_begin;
-  int64_t next_buffer_size;
-  if (scanner_state_ == PAST_SCANNING) {
-    // In the PAST_SCANNING state, we only read a small block data at a time for scanning.
-    // If the parser completes the parsing of the last json object, it will exit the loop
-    // due to BreakParse().
-    if (!stream_->GetBytes(NEXT_BLOCK_READ_SIZE, &next_buffer_begin, &next_buffer_size,
-        &buffer_status_)) {
-      DCHECK(!buffer_status_.ok());
-      return;
-    }
-
-    // A special case is when the first character of the next scan range is a newline
-    // character (perhaps with other whitespace characters before it). Our scan should
-    // stop at the first newline character in the next range, while the parser skips
-    // whitespace characters. If we don't handle this case, the first line of the next
-    // range will be scanned twice. Therefore, we need to return directly here to inform
-    // the parser that eos has been reached.
-    if (AllWhitespaceBeforeNewline(next_buffer_begin, next_buffer_size)) {
-      scanner_state_ = FINISHED;
-      return;
-    }
+  uint8_t* next_buffer_begin = nullptr;
+  int64_t next_buffer_size = 0;
+  if (decompressor_ == nullptr) {
+    buffer_status_ = FillBytesToBuffer(&next_buffer_begin, &next_buffer_size);
+  } else if (decompressor_->supports_streaming()) {
+    bool eosr = false;
+    // The JsonParser always copies values instead of referencing them, so it doesn't
+    // reference the data in the data_buffer_pool_. Therefore, we don't need current_pool_
+    // to acquire data from the data_buffer_pool_, so we pass nullptr to
+    // DecompressStreamToBuffer().
+    buffer_status_ = DecompressStreamToBuffer(&next_buffer_begin, &next_buffer_size,
+        nullptr, &eosr);
   } else {
-    buffer_status_ = stream_->GetBuffer(false, &next_buffer_begin, &next_buffer_size);
-    RETURN_VOID_IF_ERROR(buffer_status_);
+    buffer_status_ = DecompressFileToBuffer(&next_buffer_begin, &next_buffer_size);
+    if (next_buffer_size == 0) scanner_state_ = FINISHED;
   }
+  RETURN_VOID_IF_ERROR(buffer_status_);
+  if (UNLIKELY(next_buffer_size == 0)) return;
 
   *begin = reinterpret_cast<char*>(next_buffer_begin);
   *end = *begin + next_buffer_size;
diff --git a/be/src/exec/json/hdfs-json-scanner.h b/be/src/exec/json/hdfs-json-scanner.h
index 47b3cc45d..08551b74c 100644
--- a/be/src/exec/json/hdfs-json-scanner.h
+++ b/be/src/exec/json/hdfs-json-scanner.h
@@ -79,10 +79,7 @@ class HdfsJsonScanner : public HdfsScanner {
   /// Return true if we have builtin support for scanning text files compressed with this
   /// codec.
   static bool HasBuiltinSupport(THdfsCompression::type compression) {
-    // TODO: Support scanning compressed json file.
-    DCHECK_EQ(compression, THdfsCompression::NONE);
-    if (compression == THdfsCompression::NONE) return true;
-    return false;
+    return HdfsTextScanner::HasBuiltinSupport(compression);
   }
 
  private:
@@ -107,6 +104,11 @@ class HdfsJsonScanner : public HdfsScanner {
   /// reports the error message. Returns false if necessary to abort the scan.
   bool HandleConvertError(const SlotDescriptor* desc, const char* data, int len);
 
+  /// Fills bytes to buffer from the context. If 'scanner_state_' is PAST_SCANNING, the
+  /// scanner will read a small block data, otherwise it will just read whatever is the
+  /// io mgr buffer size.
+  Status FillBytesToBuffer(uint8_t** buffer, int64_t* bytes_read) WARN_UNUSED_RESULT;
+
   /// Scanner state, advances as the scanning process progresses.
   enum ScannerState {
     CREATED,
diff --git a/be/src/exec/text/hdfs-text-scanner.cc b/be/src/exec/text/hdfs-text-scanner.cc
index 0aae4faaf..3429d3a88 100644
--- a/be/src/exec/text/hdfs-text-scanner.cc
+++ b/be/src/exec/text/hdfs-text-scanner.cc
@@ -78,10 +78,6 @@ const char* HdfsTextScanner::LLVM_CLASS_NAME = "class.impala::HdfsTextScanner";
 // Suffix for lzo index file: hdfs-filename.index
 const string HdfsTextScanner::LZO_INDEX_SUFFIX = ".index";
 
-// Number of bytes to read when the previous attempt to streaming decompress did not make
-// progress.
-const int64_t COMPRESSED_DATA_FIXED_READ_SIZE = 1 * 1024 * 1024;
-
 HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
     : HdfsScanner(scan_node, state),
       byte_buffer_ptr_(nullptr),
@@ -529,166 +525,20 @@ Status HdfsTextScanner::FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes)
     *eosr = stream_->eosr();
   } else if (decompressor_->supports_streaming()) {
     DCHECK_EQ(num_bytes, 0);
-    RETURN_IF_ERROR(FillByteBufferCompressedStream(pool, eosr));
+    RETURN_IF_ERROR(DecompressStreamToBuffer(
+        reinterpret_cast<uint8_t**>(&byte_buffer_ptr_), &byte_buffer_read_size_,
+        pool, eosr));
   } else {
     DCHECK_EQ(num_bytes, 0);
-    RETURN_IF_ERROR(FillByteBufferCompressedFile(eosr));
+    RETURN_IF_ERROR(DecompressFileToBuffer(reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
+        &byte_buffer_read_size_));
+    *eosr = byte_buffer_read_size_ == 0 ? true : stream_->eosr();
   }
 
   byte_buffer_end_ = byte_buffer_ptr_ + byte_buffer_read_size_;
   return Status::OK();
 }
 
-Status HdfsTextScanner::DecompressBufferStream(int64_t bytes_to_read,
-    uint8_t** decompressed_buffer, int64_t* decompressed_len, bool *eosr) {
-  // Some decompressors, such as Bzip2 API (version 0.9 and later) and Gzip can
-  // decompress buffers that are read from stream_, so we don't need to read the
-  // whole file in once. A compressed buffer is passed to ProcessBlockStreaming
-  // but it may not consume all of the input.
-  uint8_t* compressed_buffer_ptr = nullptr;
-  int64_t compressed_buffer_size = 0;
-  // We don't know how many bytes ProcessBlockStreaming() will consume so we set
-  // peek=true and then later advance the stream using SkipBytes().
-  if (bytes_to_read == -1) {
-    RETURN_IF_ERROR(stream_->GetBuffer(true, &compressed_buffer_ptr,
-        &compressed_buffer_size));
-  } else {
-    DCHECK_GT(bytes_to_read, 0);
-    Status status;
-    if (!stream_->GetBytes(bytes_to_read, &compressed_buffer_ptr, &compressed_buffer_size,
-        &status, true)) {
-      DCHECK(!status.ok());
-      return status;
-    }
-  }
-  int64_t compressed_buffer_bytes_read = 0;
-  bool stream_end = false;
-  {
-    SCOPED_TIMER(decompress_timer_);
-    Status status = decompressor_->ProcessBlockStreaming(compressed_buffer_size,
-        compressed_buffer_ptr, &compressed_buffer_bytes_read, decompressed_len,
-        decompressed_buffer, &stream_end);
-    if (!status.ok()) {
-      stringstream ss;
-      ss << status.GetDetail() << "file=" << stream_->filename()
-          << ", offset=" << stream_->file_offset();
-      status.AddDetail(ss.str());
-      return status;
-    }
-    DCHECK_GE(compressed_buffer_size, compressed_buffer_bytes_read);
-  }
-  // Skip the bytes in stream_ that were decompressed.
-  Status status;
-  if (!stream_->SkipBytes(compressed_buffer_bytes_read, &status)) {
-    DCHECK(!status.ok());
-    return status;
-  }
-
-  if (stream_->eosr()) {
-    if (stream_end) {
-      *eosr = true;
-    } else {
-      return Status(TErrorCode::COMPRESSED_FILE_TRUNCATED, stream_->filename());
-    }
-  } else if (*decompressed_len == 0) {
-    return Status(TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS,
-        stream_->filename());
-  }
-
-  return Status::OK();
-}
-
-Status HdfsTextScanner::FillByteBufferCompressedStream(MemPool* pool, bool* eosr) {
-  // We're about to create a new decompression buffer (if we can't reuse). Attach the
-  // memory from previous decompression rounds to 'pool'.
-  if (!decompressor_->reuse_output_buffer()) {
-    if (pool != nullptr) {
-      pool->AcquireData(data_buffer_pool_.get(), false);
-    } else {
-      data_buffer_pool_->FreeAll();
-    }
-  }
-
-  uint8_t* decompressed_buffer = nullptr;
-  int64_t decompressed_len = 0;
-  // Set bytes_to_read = -1 because we don't know how much data decompressor need.
-  // Just read the first available buffer within the scan range.
-  Status status = DecompressBufferStream(-1, &decompressed_buffer, &decompressed_len,
-      eosr);
-  if (status.code() == TErrorCode::COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS) {
-    // It's possible (but very unlikely) that ProcessBlockStreaming() wasn't able to
-    // make progress if the compressed buffer returned by GetBytes() is too small.
-    // (Note that this did not even occur in simple experiments where the input buffer
-    // is always 1 byte, but we need to handle this case to be defensive.) In this
-    // case, try again with a reasonably large fixed size buffer. If we still did not
-    // make progress, then return an error.
-    LOG(INFO) << status.GetDetail();
-    status = DecompressBufferStream(COMPRESSED_DATA_FIXED_READ_SIZE,
-        &decompressed_buffer, &decompressed_len, eosr);
-  }
-  RETURN_IF_ERROR(status);
-  byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer);
-  byte_buffer_read_size_ = decompressed_len;
-
-  if (*eosr) {
-    DCHECK(stream_->eosr());
-    context_->ReleaseCompletedResources(true);
-  }
-
-  return Status::OK();
-}
-
-Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
-  // For other compressed text: attempt to read and decompress the entire file, point
-  // to the decompressed buffer, and then continue normal processing.
-  DCHECK(decompression_type_ != THdfsCompression::SNAPPY);
-  const HdfsFileDesc* desc = scan_node_->GetFileDesc(
-      context_->partition_descriptor()->id(), stream_->filename());
-  int64_t file_size = desc->file_length;
-  DCHECK_GT(file_size, 0);
-
-  Status status;
-  if (!stream_->GetBytes(file_size, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_),
-      &byte_buffer_read_size_, &status)) {
-    DCHECK(!status.ok());
-    return status;
-  }
-
-  // If didn't read anything, return.
-  if (byte_buffer_read_size_ == 0) {
-    *eosr = true;
-    return Status::OK();
-  }
-
-  // Need to read the entire file.
-  if (file_size > byte_buffer_read_size_) {
-    stringstream ss;
-    ss << "Expected to read a compressed text file of size " << file_size << " bytes. "
-       << "But only read " << byte_buffer_read_size_ << " bytes. This may indicate "
-       << "data file corruption. (file: " << stream_->filename() << ").";
-    return Status(ss.str());
-  }
-
-  // Decompress and adjust the byte_buffer_ptr_ and byte_buffer_read_size_ accordingly.
-  int64_t decompressed_len = 0;
-  uint8_t* decompressed_buffer = nullptr;
-  SCOPED_TIMER(decompress_timer_);
-  // TODO: Once the writers are in, add tests with very large compressed files (4GB)
-  // that could overflow.
-  RETURN_IF_ERROR(decompressor_->ProcessBlock(false, byte_buffer_read_size_,
-      reinterpret_cast<uint8_t*>(byte_buffer_ptr_), &decompressed_len,
-      &decompressed_buffer));
-
-  // Inform 'stream_' that the buffer with the compressed text can be released.
-  context_->ReleaseCompletedResources(true);
-
-  VLOG_FILE << "Decompressed " << byte_buffer_read_size_ << " to " << decompressed_len;
-  byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer);
-  byte_buffer_read_size_ = decompressed_len;
-  *eosr = stream_->eosr();
-  return Status::OK();
-}
-
 Status HdfsTextScanner::FindFirstTuple(MemPool* pool) {
   DCHECK_EQ(scan_state_, SCAN_RANGE_INITIALIZED);
 
diff --git a/be/src/exec/text/hdfs-text-scanner.h b/be/src/exec/text/hdfs-text-scanner.h
index 8bfbfe2e3..a6309c8c3 100644
--- a/be/src/exec/text/hdfs-text-scanner.h
+++ b/be/src/exec/text/hdfs-text-scanner.h
@@ -172,7 +172,7 @@ class HdfsTextScanner : public HdfsScanner {
   /// If num_bytes is 0, the scanner will read whatever is the io mgr buffer size,
   /// otherwise it will just read num_bytes. If we are reading compressed text, num_bytes
   /// must be 0. Internally, calls the appropriate streaming or non-streaming
-  /// decompression functions FillByteBufferCompressedFile/Stream().
+  /// decompression functions DecompressFileToBuffer()/DecompressStreamToBuffer().
   /// If applicable, attaches decompression buffers from previous calls that might still
   /// be referenced by returned batches to 'pool'. If 'pool' is nullptr the buffers are
   /// freed instead.
@@ -183,25 +183,6 @@ class HdfsTextScanner : public HdfsScanner {
   virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0)
       WARN_UNUSED_RESULT;
 
-  /// Fills the next byte buffer from the compressed data in stream_ by reading the entire
-  /// file, decompressing it, and setting the byte_buffer_ptr_ to the decompressed buffer.
-  Status FillByteBufferCompressedFile(bool* eosr) WARN_UNUSED_RESULT;
-
-  /// Fills the next byte buffer from the compressed data in stream_. Unlike
-  /// FillByteBufferCompressedFile(), the entire file does not need to be read at once.
-  /// Buffers from stream_ are decompressed as they are read and byte_buffer_ptr_ is set
-  /// to available decompressed data.
-  /// Attaches decompression buffers from previous calls that might still be referenced
-  /// by returned batches to 'pool'. If 'pool' is nullptr the buffers are freed instead.
-  Status FillByteBufferCompressedStream(MemPool* pool, bool* eosr) WARN_UNUSED_RESULT;
-
-  /// Used by FillByteBufferCompressedStream() to decompress data from 'stream_'.
-  /// Returns COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS if it needs more input.
-  /// If bytes_to_read > 0, will read specified size.
-  /// If bytes_to_read = -1, will call GetBuffer().
-  Status DecompressBufferStream(int64_t bytes_to_read, uint8_t** decompressed_buffer,
-      int64_t* decompressed_len, bool *eosr) WARN_UNUSED_RESULT;
-
   /// Checks if the current buffer ends with a row delimiter spanning this and the next
   /// buffer (i.e. a "\r\n" delimiter). Does not modify byte_buffer_ptr_, etc. Always
   /// returns false if the table's row delimiter is not '\n'. This can only be called
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index edb410962..80e36d6b5 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -483,11 +483,6 @@ public class HdfsScanNode extends ScanNode {
         throw new NotImplementedException(
             "JSON scans are disabled by --enable_json_scanner flag.");
       }
-      for (FileDescriptor fd: part.getFileDescriptors()) {
-        if (fd.getFileCompression() == HdfsCompression.NONE) continue;
-        throw new NotImplementedException(
-            "Scanning compressed Json file is not implemented yet: " + fd.getPath());
-      }
     }
 
     Column firstComplexTypedCol = null;
diff --git a/testdata/workloads/functional-query/functional-query_exhaustive.csv b/testdata/workloads/functional-query/functional-query_exhaustive.csv
index 1740b1a58..690b56168 100644
--- a/testdata/workloads/functional-query/functional-query_exhaustive.csv
+++ b/testdata/workloads/functional-query/functional-query_exhaustive.csv
@@ -26,3 +26,7 @@ file_format: orc, dataset: functional, compression_codec: def, compression_type:
 file_format: hbase, dataset: functional, compression_codec: none, compression_type: none
 file_format: kudu, dataset: functional, compression_codec: none, compression_type: none
 file_format: json, dataset: functional, compression_codec: none, compression_type: none
+file_format: json, dataset: functional, compression_codec: def, compression_type: block
+file_format: json, dataset: functional, compression_codec: gzip, compression_type: block
+file_format: json, dataset: functional, compression_codec: bzip, compression_type: block
+file_format: json, dataset: functional, compression_codec: snap, compression_type: block
diff --git a/testdata/workloads/tpcds/tpcds_core.csv b/testdata/workloads/tpcds/tpcds_core.csv
index 615e6677a..5373baa85 100644
--- a/testdata/workloads/tpcds/tpcds_core.csv
+++ b/testdata/workloads/tpcds/tpcds_core.csv
@@ -3,4 +3,4 @@ file_format: text, dataset: tpcds, compression_codec: none, compression_type: no
 file_format: seq, dataset: tpcds, compression_codec: snap, compression_type: block
 file_format: parquet, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpcds, compression_codec: def, compression_type: block
-file_format: json, dataset: tpcds, compression_codec: none, compression_type: none
+file_format: json, dataset: tpcds, compression_codec: bzip, compression_type: block
diff --git a/testdata/workloads/tpcds/tpcds_exhaustive.csv b/testdata/workloads/tpcds/tpcds_exhaustive.csv
index 0ae2e5bac..cebbbeedd 100644
--- a/testdata/workloads/tpcds/tpcds_exhaustive.csv
+++ b/testdata/workloads/tpcds/tpcds_exhaustive.csv
@@ -23,4 +23,4 @@ file_format: parquet, dataset: tpcds, compression_codec: snap, compression_type:
 file_format: orc, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpcds, compression_codec: def, compression_type: block
 file_format: orc, dataset: tpcds, compression_codec: snap, compression_type: block
-file_format: json, dataset: tpcds, compression_codec: none, compression_type: none
+file_format: json, dataset: tpcds, compression_codec: bzip, compression_type: block
diff --git a/testdata/workloads/tpcds/tpcds_pairwise.csv b/testdata/workloads/tpcds/tpcds_pairwise.csv
index d9045443d..c023bff33 100644
--- a/testdata/workloads/tpcds/tpcds_pairwise.csv
+++ b/testdata/workloads/tpcds/tpcds_pairwise.csv
@@ -15,4 +15,4 @@ file_format: rc, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpcds, compression_codec: def, compression_type: block
 file_format: orc, dataset: tpcds, compression_codec: snap, compression_type: block
-file_format: json, dataset: tpcds, compression_codec: none, compression_type: none
+file_format: json, dataset: tpcds, compression_codec: bzip, compression_type: block
diff --git a/testdata/workloads/tpch/tpch_core.csv b/testdata/workloads/tpch/tpch_core.csv
index 4d7193ad4..67687469d 100644
--- a/testdata/workloads/tpch/tpch_core.csv
+++ b/testdata/workloads/tpch/tpch_core.csv
@@ -9,4 +9,4 @@ file_format:avro, dataset:tpch, compression_codec: snap, compression_type: block
 file_format:parquet, dataset:tpch, compression_codec: none, compression_type: none
 file_format:orc, dataset:tpch, compression_codec: def, compression_type: block
 file_format:kudu, dataset:tpch, compression_codec: none, compression_type: none
-file_format:json, dataset:tpch, compression_codec:none, compression_type:none
+file_format:json, dataset:tpch, compression_codec:gzip, compression_type:block
diff --git a/testdata/workloads/tpch/tpch_exhaustive.csv b/testdata/workloads/tpch/tpch_exhaustive.csv
index fc1821a3b..2e783945d 100644
--- a/testdata/workloads/tpch/tpch_exhaustive.csv
+++ b/testdata/workloads/tpch/tpch_exhaustive.csv
@@ -25,4 +25,4 @@ file_format: orc, dataset: tpch, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpch, compression_codec: def, compression_type: block
 file_format: orc, dataset: tpch, compression_codec: snap, compression_type: block
 file_format: kudu, dataset:tpch, compression_codec: none, compression_type: none
-file_format: json, dataset: tpch, compression_codec: none, compression_type: none
+file_format: json, dataset: tpch, compression_codec: gzip, compression_type: block
diff --git a/testdata/workloads/tpch/tpch_pairwise.csv b/testdata/workloads/tpch/tpch_pairwise.csv
index 62550f7e7..6d6063061 100644
--- a/testdata/workloads/tpch/tpch_pairwise.csv
+++ b/testdata/workloads/tpch/tpch_pairwise.csv
@@ -16,4 +16,4 @@ file_format: orc, dataset: tpch, compression_codec: none, compression_type: none
 file_format: orc, dataset: tpch, compression_codec: def, compression_type: block
 file_format: orc, dataset: tpch, compression_codec: snap, compression_type: block
 file_format: kudu, dataset:tpch, compression_codec: none, compression_type: none
-file_format: json, dataset: tpch, compression_codec: none, compression_type: none
+file_format: json, dataset: tpch, compression_codec: gzip, compression_type: block
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 333fc349c..029638da0 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -155,7 +155,7 @@ class TestCompressedText(TestCompressedFormatsBase):
     super(TestCompressedText, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.clear()
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension('file_format', *['text']))
+        ImpalaTestDimension('file_format', *['text', 'json']))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('compression_format', *compression_formats))