You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/06/19 16:57:38 UTC

[impala] 03/05: IMPALA-8617: Add support for lz4 in parquet

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 97a6a3c8077affd5a181054d706cc7f30aecca91
Author: Abhishek <ar...@cloudera.com>
AuthorDate: Mon Jun 10 09:55:24 2019 -0700

    IMPALA-8617: Add support for lz4 in parquet
    
    A new enum value LZ4_BLOCKED was added to the THdfsCompression enum, to
    distinguish it from the existing LZ4 codec. LZ4_BLOCKED codec represents
    the block compression scheme used by Hadoop. Its similar to
    SNAPPY_BLOCKED as far as the block format is concerned, with the only
    difference being the codec used for compression and decompression.
    
    Added Lz4BlockCompressor and Lz4BlockDecompressor classes for
    compressing and decompressing parquet data using Hadoop's
    lz4 block compression scheme.
    
    The Lz4BlockCompressor treats the input
    as a single block and generates a compressed block with following layout
      <4 byte big endian uncompressed size>
      <4 byte big endian compressed size>
      <lz4 compressed block>
    The hdfs parquet table writer should call the Lz4BlockCompressor
    using the ideal input size (unit of compression in parquet is a page),
    and so the Lz4BlockCompressor does not further break down the input
    into smaller blocks.
    
    The Lz4BlockDecompressor on the other hand should be compatible with
    blocks written by Impala and other engines in Hadoop ecosystem. It can
    decompress compressed data in following format
      <4 byte big endian uncompressed size>
      <4 byte big endian compressed size>
      <lz4 compressed block>
      ...
      <4 byte big endian compressed size>
      <lz4 compressed block>
      ...
      <repeated untill uncompressed size from outer block is consumed>
    
    Externally users can now set the lz4 codec for parquet using:
      set COMPRESSION_CODEC=lz4
    This gets translated into LZ4_BLOCKED codec for the
    HdfsParquetTableWriter. Similarly, when reading lz4 compressed parquet
    data, the LZ4_BLOCKED codec is used.
    
    Testing:
     - Added unit tests for LZ4_BLOCKED in decompress-test.cc
     - Added unit tests for Hadoop compatibility in decompress-test.cc,
       basically being able to decompress an outer block with multiple inner
       blocks (the Lz4BlockDecompressor description above)
     - Added interoperability tests for Hive and Impala for all parquet
       codecs. New test added to
       tests/custom_cluster/test_hive_parquet_codec_interop.py
    
    Change-Id: Ia6850a39ef3f1e0e7ba48e08eef1d4f7cbb74d0c
    Reviewed-on: http://gerrit.cloudera.org:8080/13582
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   | 131 ++++++++++++++-------
 be/src/exec/parquet/parquet-column-readers.cc      |  19 ++-
 be/src/exec/parquet/parquet-common.cc              |   9 +-
 be/src/exec/parquet/parquet-metadata-utils.cc      |   3 +-
 be/src/service/query-options-test.cc               |   2 +-
 be/src/util/codec.cc                               |   8 +-
 be/src/util/compress.cc                            |  45 +++++++
 be/src/util/compress.h                             |  14 +++
 be/src/util/decompress-test.cc                     |  77 +++++++++++-
 be/src/util/decompress.cc                          |  64 ++++++++++
 be/src/util/decompress.h                           |  14 +++
 common/thrift/CatalogObjects.thrift                |   1 +
 common/thrift/generate_error_codes.py              |  13 ++
 .../functional-query/queries/QueryTest/set.test    |   2 +-
 tests/common/test_dimensions.py                    |   2 +-
 .../test_hive_parquet_codec_interop.py             |  91 ++++++++++++++
 tests/query_test/test_insert.py                    |   2 +-
 tests/query_test/test_insert_parquet.py            |   2 +-
 18 files changed, 440 insertions(+), 59 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 1a6aaf7..00ded5c 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -94,7 +94,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
  public:
   // expr - the expression to generate output values for this column.
   BaseColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* expr_eval,
-      const Codec::CodecInfo& codec_info)
+      const Codec::CodecInfo& codec_info, const string column_name)
     : parent_(parent),
       expr_eval_(expr_eval),
       codec_info_(codec_info),
@@ -108,7 +108,8 @@ class HdfsParquetTableWriter::BaseColumnWriter {
       values_buffer_len_(DEFAULT_DATA_PAGE_SIZE),
       page_stats_base_(nullptr),
       row_group_stats_base_(nullptr),
-      table_sink_mem_tracker_(parent_->parent_->mem_tracker()) {
+      table_sink_mem_tracker_(parent_->parent_->mem_tracker()),
+      column_name_(std::move(column_name)) {
     static_assert(std::is_same<decltype(parent_->parent_), HdfsTableSink*>::value,
         "'table_sink_mem_tracker_' must point to the mem tracker of an HdfsTableSink");
     def_levels_ = parent_->state_->obj_pool()->Add(
@@ -205,6 +206,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   parquet::CompressionCodec::type GetParquetCodec() const {
     return ConvertImpalaToParquetCodec(codec_info_.format_);
   }
+  const std::string& column_name() { return column_name_; }
 
  protected:
   friend class HdfsParquetTableWriter;
@@ -393,6 +395,9 @@ class HdfsParquetTableWriter::BaseColumnWriter {
 
   // True, if we should write the page index.
   bool write_page_index_;
+
+  // Column name in the HdfsTableDescriptor.
+  const string column_name_;
 };
 
 // Per type column writer.
@@ -401,8 +406,8 @@ class HdfsParquetTableWriter::ColumnWriter :
     public HdfsParquetTableWriter::BaseColumnWriter {
  public:
   ColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
-      const Codec::CodecInfo& codec_info)
-    : BaseColumnWriter(parent, eval, codec_info),
+      const Codec::CodecInfo& codec_info, const std::string& col_name)
+    : BaseColumnWriter(parent, eval, codec_info, col_name),
       num_values_since_dict_size_check_(0),
       plain_encoded_value_size_(
           ParquetPlainEncoder::EncodedByteSize(eval->root().type())) {
@@ -521,8 +526,8 @@ class HdfsParquetTableWriter::BoolColumnWriter :
     public HdfsParquetTableWriter::BaseColumnWriter {
  public:
   BoolColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
-      const Codec::CodecInfo& codec_info)
-    : BaseColumnWriter(parent, eval, codec_info),
+      const Codec::CodecInfo& codec_info, const std::string& col_name)
+    : BaseColumnWriter(parent, eval, codec_info, col_name),
       page_stats_(parent_->reusable_col_mem_pool_.get(), -1),
       row_group_stats_(parent_->reusable_col_mem_pool_.get(), -1) {
     DCHECK_EQ(eval->root().type().type, TYPE_BOOLEAN);
@@ -577,8 +582,8 @@ class HdfsParquetTableWriter::Int64TimestampColumnWriterBase :
     public HdfsParquetTableWriter::ColumnWriter<int64_t> {
 public:
  Int64TimestampColumnWriterBase(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
-     const Codec::CodecInfo& codec_info)
-   : HdfsParquetTableWriter::ColumnWriter<int64_t>(parent, eval, codec_info) {
+     const Codec::CodecInfo& codec_info, const std::string& col_name)
+   : HdfsParquetTableWriter::ColumnWriter<int64_t>(parent, eval, codec_info, col_name) {
    int64_t dummy;
    plain_encoded_value_size_ = ParquetPlainEncoder::ByteSize(dummy);
   }
@@ -604,8 +609,10 @@ class HdfsParquetTableWriter::Int64MilliTimestampColumnWriter :
     public HdfsParquetTableWriter::Int64TimestampColumnWriterBase {
 public:
  Int64MilliTimestampColumnWriter(HdfsParquetTableWriter* parent,
-     ScalarExprEvaluator* eval, const Codec::CodecInfo& codec_info)
-   : HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec_info) {}
+     ScalarExprEvaluator* eval, const Codec::CodecInfo& codec_info,
+     const std::string& col_name)
+   : HdfsParquetTableWriter::Int64TimestampColumnWriterBase(
+       parent, eval, codec_info, col_name) {}
 
 protected:
   virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) {
@@ -618,8 +625,10 @@ class HdfsParquetTableWriter::Int64MicroTimestampColumnWriter :
     public HdfsParquetTableWriter::Int64TimestampColumnWriterBase {
 public:
  Int64MicroTimestampColumnWriter(HdfsParquetTableWriter* parent,
-     ScalarExprEvaluator* eval, const Codec::CodecInfo& codec_info)
-   : HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec_info) {}
+     ScalarExprEvaluator* eval, const Codec::CodecInfo& codec_info,
+     const std::string& col_name)
+   : HdfsParquetTableWriter::Int64TimestampColumnWriterBase(
+       parent, eval, codec_info, col_name) {}
 
 protected:
   virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) {
@@ -633,8 +642,9 @@ class HdfsParquetTableWriter::Int64NanoTimestampColumnWriter :
     public HdfsParquetTableWriter::Int64TimestampColumnWriterBase {
 public:
  Int64NanoTimestampColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
-     const Codec::CodecInfo& codec_info)
-   : HdfsParquetTableWriter::Int64TimestampColumnWriterBase(parent, eval, codec_info) {}
+     const Codec::CodecInfo& codec_info, const std::string& col_name)
+   : HdfsParquetTableWriter::Int64TimestampColumnWriterBase(
+       parent, eval, codec_info, col_name) {}
 
 protected:
   virtual bool ConvertTimestamp(const TimestampValue& ts, int64_t* result) {
@@ -756,8 +766,13 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
       uint8_t* compressed_data =
           parent_->per_file_mem_pool_->Allocate(max_compressed_size);
       header.compressed_page_size = max_compressed_size;
-      RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
-          dict_buffer, &header.compressed_page_size, &compressed_data));
+      const Status& status =
+          compressor_->ProcessBlock32(true, header.uncompressed_page_size, dict_buffer,
+              &header.compressed_page_size, &compressed_data);
+      if (!status.ok()) {
+        return Status(Substitute("Error writing parquet file '$0' column '$1': $2",
+            parent_->output_->current_file_name, column_name(), status.GetDetail()));
+      }
       dict_buffer = compressed_data;
       // We allocated the output based on the guessed size, return the extra allocated
       // bytes back to the mem pool.
@@ -881,8 +896,13 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
     DCHECK_GT(max_compressed_size, 0);
     uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size);
     header.compressed_page_size = max_compressed_size;
-    RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size,
-        uncompressed_data, &header.compressed_page_size, &compressed_data));
+    const Status& status =
+        compressor_->ProcessBlock32(true, header.uncompressed_page_size,
+            uncompressed_data, &header.compressed_page_size, &compressed_data);
+    if (!status.ok()) {
+      return Status(Substitute("Error writing parquet file '$0' column '$1': $2",
+          parent_->output_->current_file_name, column_name(), status.GetDetail()));
+    }
     current_page_->data = compressed_data;
 
     // We allocated the output based on the guessed size, return the extra allocated
@@ -963,14 +983,29 @@ Status HdfsParquetTableWriter::Init() {
     codec = query_options.compression_codec.codec;
     clevel = query_options.compression_codec.compression_level;
   }
+
   if (!(codec == THdfsCompression::NONE ||
         codec == THdfsCompression::GZIP ||
         codec == THdfsCompression::SNAPPY ||
-        codec == THdfsCompression::ZSTD)) {
+        codec == THdfsCompression::ZSTD ||
+        codec == THdfsCompression::LZ4)) {
     stringstream ss;
     ss << "Invalid parquet compression codec " << Codec::GetCodecName(codec);
     return Status(ss.str());
   }
+
+  // Map parquet codecs to Impala codecs. Its important to do the above check before
+  // we do any mapping.
+  // Parquet supports codecs enumerated in parquet::CompressionCodec. Impala supports
+  // codecs enumerated in impala::THdfsCompression. In most cases, Impala codec and
+  // Parquet codec refer to the same codec. The only exception is LZ4. For Hadoop
+  // compatibility parquet::CompressionCodec::LZ4 refers to THdfsCompression::LZ4_BLOCKED
+  // and not THdfsCompression::LZ4. Hence, the following mapping and re-mapping to ensure
+  // that the input THdfsCompression::LZ4 codec gets mapped to
+  // THdfsCompression::LZ4_BLOCKED for parquet.
+  parquet::CompressionCodec::type parquet_codec = ConvertImpalaToParquetCodec(codec);
+  codec = ConvertParquetToImpalaCodec(parquet_codec);
+
   VLOG_FILE << "Using compression codec: " << codec;
   if (codec == THdfsCompression::ZSTD) {
     VLOG_FILE << "Using compression level: " << clevel;
@@ -999,45 +1034,55 @@ Status HdfsParquetTableWriter::Init() {
   for (int i = 0; i < columns_.size(); ++i) {
     BaseColumnWriter* writer = nullptr;
     const ColumnType& type = output_expr_evals_[i]->root().type();
+    const int num_clustering_cols = table_desc_->num_clustering_cols();
+    const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
     switch (type.type) {
       case TYPE_BOOLEAN:
-        writer = new BoolColumnWriter(this, output_expr_evals_[i], codec_info);
+        writer =
+          new BoolColumnWriter(this, output_expr_evals_[i], codec_info, col_name);
         break;
       case TYPE_TINYINT:
-        writer = new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec_info);
+        writer =
+          new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec_info, col_name);
         break;
       case TYPE_SMALLINT:
-        writer = new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec_info);
+        writer =
+          new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec_info, col_name);
         break;
       case TYPE_INT:
-        writer = new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec_info);
+        writer =
+          new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec_info, col_name);
         break;
       case TYPE_BIGINT:
-        writer = new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec_info);
+        writer =
+          new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec_info, col_name);
         break;
       case TYPE_FLOAT:
-        writer = new ColumnWriter<float>(this, output_expr_evals_[i], codec_info);
+        writer =
+          new ColumnWriter<float>(this, output_expr_evals_[i], codec_info, col_name);
         break;
       case TYPE_DOUBLE:
-        writer = new ColumnWriter<double>(this, output_expr_evals_[i], codec_info);
+        writer =
+          new ColumnWriter<double>(this, output_expr_evals_[i], codec_info, col_name);
         break;
       case TYPE_TIMESTAMP:
         switch (state_->query_options().parquet_timestamp_type) {
           case TParquetTimestampType::INT96_NANOS:
             writer =
-                new ColumnWriter<TimestampValue>(this, output_expr_evals_[i], codec_info);
+                new ColumnWriter<TimestampValue>(
+                    this, output_expr_evals_[i], codec_info, col_name);
             break;
           case TParquetTimestampType::INT64_MILLIS:
             writer = new Int64MilliTimestampColumnWriter(
-                this, output_expr_evals_[i], codec_info);
+                this, output_expr_evals_[i], codec_info, col_name);
             break;
           case TParquetTimestampType::INT64_MICROS:
             writer = new Int64MicroTimestampColumnWriter(
-                this, output_expr_evals_[i], codec_info);
+                this, output_expr_evals_[i], codec_info, col_name);
             break;
           case TParquetTimestampType::INT64_NANOS:
             writer = new Int64NanoTimestampColumnWriter(
-                this, output_expr_evals_[i], codec_info);
+                this, output_expr_evals_[i], codec_info, col_name);
             break;
           default:
             DCHECK(false);
@@ -1046,28 +1091,33 @@ Status HdfsParquetTableWriter::Init() {
       case TYPE_VARCHAR:
       case TYPE_STRING:
       case TYPE_CHAR:
-        writer = new ColumnWriter<StringValue>(this, output_expr_evals_[i], codec_info);
+        writer = new ColumnWriter<StringValue>(
+            this, output_expr_evals_[i], codec_info, col_name);
         break;
       case TYPE_DECIMAL:
         switch (output_expr_evals_[i]->root().type().GetByteSize()) {
           case 4:
             writer =
-                new ColumnWriter<Decimal4Value>(this, output_expr_evals_[i], codec_info);
+                new ColumnWriter<Decimal4Value>(
+                    this, output_expr_evals_[i], codec_info, col_name);
             break;
           case 8:
             writer =
-                new ColumnWriter<Decimal8Value>(this, output_expr_evals_[i], codec_info);
+                new ColumnWriter<Decimal8Value>(
+                    this, output_expr_evals_[i], codec_info, col_name);
             break;
           case 16:
             writer =
-                new ColumnWriter<Decimal16Value>(this, output_expr_evals_[i], codec_info);
+                new ColumnWriter<Decimal16Value>(
+                    this, output_expr_evals_[i], codec_info, col_name);
             break;
           default:
             DCHECK(false);
         }
         break;
       case TYPE_DATE:
-        writer = new ColumnWriter<DateValue>(this, output_expr_evals_[i], codec_info);
+        writer = new ColumnWriter<DateValue>(
+            this, output_expr_evals_[i], codec_info, col_name);
         break;
       default:
         DCHECK(false);
@@ -1080,8 +1130,6 @@ Status HdfsParquetTableWriter::Init() {
 }
 
 Status HdfsParquetTableWriter::CreateSchema() {
-  int num_clustering_cols = table_desc_->num_clustering_cols();
-
   // Create flattened tree with a single root.
   file_metadata_.schema.resize(columns_.size() + 1);
   file_metadata_.schema[0].__set_num_children(columns_.size());
@@ -1090,7 +1138,7 @@ Status HdfsParquetTableWriter::CreateSchema() {
   for (int i = 0; i < columns_.size(); ++i) {
     parquet::SchemaElement& col_schema = file_metadata_.schema[i + 1];
     const ColumnType& col_type = output_expr_evals_[i]->root().type();
-    col_schema.name = table_desc_->col_descs()[i + num_clustering_cols].name();
+    col_schema.name = columns_[i]->column_name();
     ParquetMetadataUtils::FillSchemaElement(col_type, state_->query_options(),
                                             &col_schema);
   }
@@ -1104,14 +1152,12 @@ Status HdfsParquetTableWriter::AddRowGroup() {
   current_row_group_ = &file_metadata_.row_groups[file_metadata_.row_groups.size() - 1];
 
   // Initialize new row group metadata.
-  int num_clustering_cols = table_desc_->num_clustering_cols();
   current_row_group_->columns.resize(columns_.size());
   for (int i = 0; i < columns_.size(); ++i) {
     parquet::ColumnMetaData metadata;
     metadata.type = ParquetMetadataUtils::ConvertInternalToParquetType(
         columns_[i]->type().type, state_->query_options());
-    metadata.path_in_schema.push_back(
-        table_desc_->col_descs()[i + num_clustering_cols].name());
+    metadata.path_in_schema.push_back(columns_[i]->column_name());
     metadata.codec = columns_[i]->GetParquetCodec();
     current_row_group_->columns[i].__set_meta_data(metadata);
   }
@@ -1274,7 +1320,6 @@ Status HdfsParquetTableWriter::WriteFileHeader() {
 Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
   if (current_row_group_ == nullptr) return Status::OK();
 
-  int num_clustering_cols = table_desc_->num_clustering_cols();
   for (int i = 0; i < columns_.size(); ++i) {
     int64_t data_page_offset, dict_page_offset;
     // Flush this column.  This updates the final metadata sizes for this column.
@@ -1295,7 +1340,7 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     current_row_group_->total_byte_size += col_writer->total_compressed_size();
     current_row_group_->num_rows = col_writer->num_values();
     current_row_group_->columns[i].file_offset = file_pos_;
-    const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
+    const string& col_name = col_writer->column_name();
     google::protobuf::Map<string,int64>* column_size_map =
         parquet_dml_stats_.mutable_per_column_size();
     (*column_size_map)[col_name] += col_writer->total_compressed_size();
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index 0588616..0ed8cd8 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -1340,8 +1340,13 @@ Status BaseScalarColumnReader::InitDictionary() {
 
   if (decompressor_.get() != nullptr) {
     int uncompressed_size = current_page_header_.uncompressed_page_size;
-    RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
-                    &uncompressed_size, &dict_values));
+    const Status& status = decompressor_->ProcessBlock32(true, data_size, data_,
+        &uncompressed_size, &dict_values);
+    if (!status.ok()) {
+      return Status(Substitute("Error decompressing parquet file '$0' column '$1'"
+               " data_page_offset $2: $3", filename(), node_.element->name,
+               metadata_->data_page_offset, status.GetDetail()));
+    }
     VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
     if (current_page_header_.uncompressed_page_size != uncompressed_size) {
       return Status(Substitute("Error decompressing dictionary page in file '$0'. "
@@ -1465,9 +1470,15 @@ Status BaseScalarColumnReader::ReadDataPage() {
       uint8_t* decompressed_buffer;
       RETURN_IF_ERROR(AllocateUncompressedDataPage(
             uncompressed_size, "decompressed data", &decompressed_buffer));
-      RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
+      const Status& status = decompressor_->ProcessBlock32(true,
           current_page_header_.compressed_page_size, data_, &uncompressed_size,
-          &decompressed_buffer));
+          &decompressed_buffer);
+      if (!status.ok()) {
+        return Status(Substitute("Error decompressing parquet file '$0' column '$1'"
+                 " data_page_offset $2: $3", filename(), node_.element->name,
+                 metadata_->data_page_offset, status.GetDetail()));
+      }
+
       VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
                 << " to " << uncompressed_size;
       if (current_page_header_.uncompressed_page_size != uncompressed_size) {
diff --git a/be/src/exec/parquet/parquet-common.cc b/be/src/exec/parquet/parquet-common.cc
index 20b23dc..285ef0d 100644
--- a/be/src/exec/parquet/parquet-common.cc
+++ b/be/src/exec/parquet/parquet-common.cc
@@ -26,8 +26,9 @@ const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = {
   THdfsCompression::GZIP,
   THdfsCompression::LZO,
   THdfsCompression::BROTLI,
-  THdfsCompression::LZ4,
-  THdfsCompression::ZSTD};
+  THdfsCompression::LZ4_BLOCKED,
+  THdfsCompression::ZSTD
+};
 
 const int PARQUET_TO_IMPALA_CODEC_SIZE =
     sizeof(PARQUET_TO_IMPALA_CODEC) / sizeof(PARQUET_TO_IMPALA_CODEC[0]);
@@ -46,7 +47,9 @@ const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
   parquet::CompressionCodec::LZ4,
   parquet::CompressionCodec::GZIP,   // ZLIB
   parquet::CompressionCodec::ZSTD,
-  parquet::CompressionCodec::BROTLI};
+  parquet::CompressionCodec::BROTLI,
+  parquet::CompressionCodec::LZ4     // LZ4_BLOCKED
+};
 
 const int IMPALA_TO_PARQUET_CODEC_SIZE =
     sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]);
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc
index 02544ca..aece0e1 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -284,7 +284,8 @@ Status ParquetMetadataUtils::ValidateRowGroupColumn(
   if (codec != parquet::CompressionCodec::UNCOMPRESSED &&
       codec != parquet::CompressionCodec::SNAPPY &&
       codec != parquet::CompressionCodec::GZIP &&
-      codec != parquet::CompressionCodec::ZSTD) {
+      codec != parquet::CompressionCodec::ZSTD &&
+      codec != parquet::CompressionCodec::LZ4) {
     return Status(Substitute("File '$0' uses an unsupported compression: $1 for column "
         "'$2'.", filename, codec, schema_element.name));
   }
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 97b0b71..46fff23 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -468,7 +468,7 @@ TEST(QueryOptions, CompressionCodec) {
 #define CASE(enumtype, enums) {ENTRIES(enumtype, BOOST_PP_TUPLE_TO_SEQ(enums))}
   TQueryOptions options;
   vector<THdfsCompression::type> codecs = CASE(THdfsCompression, (NONE, DEFAULT, GZIP,
-      DEFLATE, BZIP2, SNAPPY, SNAPPY_BLOCKED, LZO, LZ4, ZLIB, ZSTD, BROTLI));
+      DEFLATE, BZIP2, SNAPPY, SNAPPY_BLOCKED, LZO, LZ4, ZLIB, ZSTD, BROTLI, LZ4_BLOCKED));
   // Test valid values for compression_codec.
   for (auto& codec : codecs) {
     EXPECT_TRUE(SetQueryOption("compression_codec", Substitute("$0",codec), &options,
diff --git a/be/src/util/codec.cc b/be/src/util/codec.cc
index 8f19eee..18508b6 100644
--- a/be/src/util/codec.cc
+++ b/be/src/util/codec.cc
@@ -47,7 +47,7 @@ const Codec::CodecMap Codec::CODEC_MAP = {{"", THdfsCompression::NONE},
     {GZIP_COMPRESSION, THdfsCompression::GZIP},
     {BZIP2_COMPRESSION, THdfsCompression::BZIP2},
     {SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED},
-    {LZ4_COMPRESSION, THdfsCompression::LZ4},
+    {LZ4_COMPRESSION, THdfsCompression::LZ4_BLOCKED},
     {ZSTD_COMPRESSION, THdfsCompression::ZSTD}};
 
 string Codec::GetCodecName(THdfsCompression::type type) {
@@ -114,6 +114,9 @@ Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const CodecInfo& c
       compressor->reset(new ZstandardCompressor(mem_pool, reuse,
           codec_info.compression_level_));
       break;
+    case THdfsCompression::LZ4_BLOCKED:
+      compressor->reset(new Lz4BlockCompressor(mem_pool, reuse));
+      break;
     default: {
       if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
       return Status(Substitute("Unsupported codec: $0", format));
@@ -163,6 +166,9 @@ Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse,
     case THdfsCompression::ZSTD:
       decompressor->reset(new ZstandardDecompressor(mem_pool, reuse));
       break;
+    case THdfsCompression::LZ4_BLOCKED:
+      decompressor->reset(new Lz4BlockDecompressor(mem_pool, reuse));
+      break;
     default: {
       if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
       return Status(Substitute("Unsupported codec: $0", format));
diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index 847168d..d5c279c 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -340,3 +340,48 @@ Status ZstandardCompressor::ProcessBlock(bool output_preallocated, int64_t input
   }
   return Status::OK();
 }
+
+Lz4BlockCompressor::Lz4BlockCompressor(MemPool* mem_pool, bool reuse_buffer)
+  : Codec(mem_pool, reuse_buffer) {
+}
+
+int64_t Lz4BlockCompressor::MaxOutputLen(int64_t input_length, const uint8_t* input) {
+  // Hadoop uses a block compression scheme. For more details look at the comments for
+  // the SnappyBlockCompressor implementation above.
+  // If input_length == 0 then only the input_length will be stored in the compressed
+  // block.
+  if (input_length == 0) { return sizeof(int32_t); }
+
+  // The length estimation includes upper bound on LZ4 compressed data for the given
+  // input_length and two int storage for uncompressed length and compressed
+  // length.
+  return LZ4_compressBound(input_length) + 2 * sizeof(int32_t);
+}
+
+Status Lz4BlockCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
+    const uint8_t* input, int64_t* output_length, uint8_t** output) {
+  DCHECK_GE(input_length, 0);
+  size_t length = MaxOutputLen(input_length, input);
+
+  CHECK(output_preallocated && length <= *output_length)
+    << " Output was not allocated for Lz4 Codec or is not sufficient."
+    << " output_preallocated " << output_preallocated << " length: " << length
+    << " output_length " << *output_length;
+
+  uint8_t* outp = *output;
+  ReadWriteUtil::PutInt(outp, static_cast<uint32_t>(input_length));
+  outp += sizeof(int32_t);
+  if (input_length > 0) {
+    uint8_t* sizep = outp;
+    outp += sizeof(int32_t);
+    const int64_t size = LZ4_compress_default(reinterpret_cast<const char*>(input),
+        reinterpret_cast<char*>(outp), input_length, *output_length - (outp - *output));
+    if (size == 0) { return Status(TErrorCode::LZ4_COMPRESS_DEFAULT_FAILED); }
+    ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size));
+    outp += size;
+    DCHECK_LE(outp - *output, length);
+  }
+
+  *output_length = outp - *output;
+  return Status::OK();
+}
diff --git a/be/src/util/compress.h b/be/src/util/compress.h
index 493051d..14e6527 100644
--- a/be/src/util/compress.h
+++ b/be/src/util/compress.h
@@ -149,5 +149,19 @@ class ZstandardCompressor : public Codec {
  private:
   int clevel_;
 };
+
+/// Hadoop's block compression scheme on top of LZ4.
+class Lz4BlockCompressor : public Codec {
+ public:
+  Lz4BlockCompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
+  virtual ~Lz4BlockCompressor() { }
+
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "lz4"; }
+};
 }
 #endif
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 5d7e0ca..e3a22ec 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -22,6 +22,7 @@
 
 #include "gen-cpp/Descriptors_types.h"
 
+#include "exec/read-write-util.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/mem-pool.h"
 #include "testutil/gtest-util.h"
@@ -72,7 +73,8 @@ class DecompressorTest : public ::testing::Test {
     EXPECT_OK(Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor));
 
     // LZ4 & ZSTD are not implemented to work without an allocated output
-    if (format == THdfsCompression::LZ4 || format == THdfsCompression::ZSTD) {
+    if (format == THdfsCompression::LZ4 || format == THdfsCompression::ZSTD ||
+        format == THdfsCompression::LZ4_BLOCKED) {
       CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
           sizeof(input_), input_);
       CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
@@ -371,7 +373,6 @@ class DecompressorTest : public ::testing::Test {
   // Buffer to hold generated random data that contains repeated letter [a..z] and [A..Z]
   // for compressor/decompressor testing.
   uint8_t input_[2 * 26 * 1024];
-
   // Buffer for testing ProcessBlockStreaming() which allocates STREAM_OUT_BUF_SIZE output
   // buffer. This is 4x the size of the output buffers to ensure that the decompressed output
   // requires several calls and doesn't need to be nicely aligned (the last call gets a
@@ -489,6 +490,78 @@ TEST_F(DecompressorTest, ZSTD) {
   const int clevel = uniform_int_distribution<int>(1, ZSTD_maxCLevel())(rng);
   RunTest(THdfsCompression::ZSTD, clevel);
 }
+
+TEST_F(DecompressorTest, LZ4HadoopCompat) {
+  scoped_ptr<Codec> compressor;
+  scoped_ptr<Codec> decompressor;
+
+  THdfsCompression::type format = THdfsCompression::LZ4_BLOCKED;
+  Codec::CodecInfo codec_info(format);
+  EXPECT_OK(Codec::CreateCompressor(&mem_pool_, true, codec_info, &compressor));
+  EXPECT_OK(Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor));
+
+  // Hadoop uses a block compression scheme on top of lz4. The input stream could be
+  // split into blocks and each block contains the uncompressed length for the block
+  // followed by one of more length-prefixed blocks of compressed data.
+  // Block layout:
+  //     <4 byte big endian uncompressed_size><inner block 1>...<inner block N>
+  //     inner block <i> layout: <4-byte big endian compressed_size><lz4 compressed block>
+  // For this unit test we assume an inner block size of 8K.
+  const int64_t input_len = sizeof(input_);
+  const int64_t block_len = 8192;
+  const int64_t num_blocks = ceil((double)input_len/(double)block_len);
+  const int64_t max_compressed_block_length = compressor->MaxOutputLen(block_len, NULL);
+  const int64_t total_compressed_buffer_length =
+      max_compressed_block_length * num_blocks;
+  uint8_t* const compressed_buffer = mem_pool_.Allocate(total_compressed_buffer_length);
+  // Write the total uncompressed_size
+  ReadWriteUtil::PutInt(compressed_buffer, static_cast<uint32_t>(input_len));
+  int64_t compressed_buffer_index = sizeof(uint32_t);
+  // Preallocate output buffers for compressor
+  uint8_t* const compressed_block = mem_pool_.Allocate(max_compressed_block_length);
+  // Generate Hadoop's compressed block layout
+  const int num_8K_inner_blocks = input_len / block_len;
+  for (int i=0; i < (num_8K_inner_blocks * block_len); i += block_len) {
+    uint8_t* input = &input_[i];
+    uint8_t* compressed = compressed_block;
+    int64_t compressed_length = max_compressed_block_length;
+    EXPECT_OK(compressor->ProcessBlock(true, block_len, input, &compressed_length,
+        &compressed));
+    compressed += sizeof(uint32_t);
+    compressed_length -= sizeof(uint32_t);
+    memcpy(compressed_buffer + compressed_buffer_index, compressed, compressed_length);
+    compressed_buffer_index += compressed_length;
+  }
+  // Compress the last block (not a multiple of 8K bytes), if any
+  if (input_len % block_len != 0) {
+    const int64_t last_block_sz = input_len % block_len;
+    const uint64_t last_block_index = input_len - last_block_sz;
+    uint8_t* input = &input_[last_block_index];
+    uint8_t* compressed = compressed_block;
+    int64_t compressed_length = max_compressed_block_length;
+    EXPECT_OK(compressor->ProcessBlock(true, last_block_sz, input, &compressed_length,
+        &compressed));
+    compressed += sizeof(uint32_t);
+    compressed_length -= sizeof(uint32_t);
+    memcpy(compressed_buffer + compressed_buffer_index, compressed, compressed_length);
+    compressed_buffer_index += compressed_length;
+  }
+  DCHECK_LE(compressed_buffer_index, total_compressed_buffer_length);
+
+  // Now that we have a compressed block using Hadoop's lz4 block compression scheme,
+  // let's decompress it using Impala's LZ4BlockDecompressor.
+  int64_t output_len = input_len;
+  uint8_t* output = mem_pool_.Allocate(output_len);
+  EXPECT_OK(decompressor->ProcessBlock(true, compressed_buffer_index, compressed_buffer,
+      &output_len, &output));
+
+  EXPECT_EQ(output_len, input_len);
+  EXPECT_EQ(memcmp(input_, output, input_len), 0);
+}
+
+TEST_F(DecompressorTest, LZ4Blocked) {
+  RunTest(THdfsCompression::LZ4_BLOCKED);
+}
 }
 
 int main(int argc, char **argv) {
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 438bfbe..000b585 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -626,3 +626,67 @@ Status ZstandardDecompressor::ProcessBlock(bool output_preallocated, int64_t inp
   *output_length = ret;
   return Status::OK();
 }
+
+Lz4BlockDecompressor::Lz4BlockDecompressor(MemPool* mem_pool, bool reuse_buffer)
+  : Codec(mem_pool, reuse_buffer) {
+}
+
+int64_t Lz4BlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
+  DCHECK(input != nullptr) << "Passed null input to Lz4 Decompressor";
+  return -1;
+}
+
+// Decompresses a block compressed using Hadoop's lz4 block compression scheme. The
+// compressed block layout is similar to Hadoop's snappy block compression scheme, with
+// the only difference being the compression codec used. For more details please refer
+// to the comment section for the SnappyBlockDecompress above.
+Status Lz4BlockDecompressor::ProcessBlock(bool output_preallocated, int64_t input_len,
+    const uint8_t* input, int64_t* output_len, uint8_t** output) {
+  DCHECK(output_preallocated) << "Lz4 Codec implementation must have allocated output";
+  if(*output_len == 0) return Status::OK();
+  uint8_t* out_ptr = *output;
+  int64_t uncompressed_total_len = 0;
+  const int64_t buffer_size = *output_len;
+  *output_len = 0;
+
+  while (input_len > 0) {
+    uint32_t uncompressed_block_len = ReadWriteUtil::GetInt<uint32_t>(input);
+    input += sizeof(uint32_t);
+    input_len -= sizeof(uint32_t);
+    int64_t remaining_output_size = buffer_size - uncompressed_total_len;
+    if (remaining_output_size < uncompressed_block_len) {
+      return Status(TErrorCode::LZ4_BLOCK_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
+    }
+
+    while (uncompressed_block_len > 0) {
+      // Check that input length should not be negative.
+      if (input_len < 0) {
+        return Status(TErrorCode::LZ4_BLOCK_DECOMPRESS_INVALID_INPUT_LENGTH);
+      }
+      // Read the length of the next lz4 compressed block.
+      size_t compressed_len = ReadWriteUtil::GetInt<uint32_t>(input);
+      input += sizeof(uint32_t);
+      input_len -= sizeof(uint32_t);
+
+      if (compressed_len == 0 || compressed_len > input_len) {
+        return Status(TErrorCode::LZ4_BLOCK_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
+      }
+
+      // Decompress this block.
+      int64_t remaining_output_size = buffer_size - uncompressed_total_len;
+      int uncompressed_len = LZ4_decompress_safe(reinterpret_cast<const char*>(input),
+          reinterpret_cast<char*>(out_ptr), compressed_len, remaining_output_size);
+      if (uncompressed_len < 0) {
+        return Status(TErrorCode::LZ4_DECOMPRESS_SAFE_FAILED);
+      }
+
+      out_ptr += uncompressed_len;
+      input += compressed_len;
+      input_len -= compressed_len;
+      uncompressed_block_len -= uncompressed_len;
+      uncompressed_total_len += uncompressed_len;
+    }
+  }
+  *output_len = uncompressed_total_len;
+  return Status::OK();
+}
diff --git a/be/src/util/decompress.h b/be/src/util/decompress.h
index 9ff66fe..0c41bcd 100644
--- a/be/src/util/decompress.h
+++ b/be/src/util/decompress.h
@@ -144,5 +144,19 @@ class ZstandardDecompressor : public Codec {
       uint8_t** output) override WARN_UNUSED_RESULT;
   virtual std::string file_extension() const override { return "zstd"; }
 };
+
+/// Hadoop's block compression scheme on top of LZ4.
+class Lz4BlockDecompressor : public Codec {
+ public:
+  virtual ~Lz4BlockDecompressor() { }
+  Lz4BlockDecompressor(MemPool* mem_pool = nullptr, bool reuse_buffer = false);
+
+  virtual int64_t MaxOutputLen(
+      int64_t input_len, const uint8_t* input = nullptr) override;
+  virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) override WARN_UNUSED_RESULT;
+  virtual std::string file_extension() const override { return "lz4"; }
+};
 }
 #endif
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 84e85f6..01bba1d 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -79,6 +79,7 @@ enum THdfsCompression {
   ZLIB = 9
   ZSTD = 10
   BROTLI = 11
+  LZ4_BLOCKED = 12
 }
 
 enum TColumnEncoding {
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 590d7a8..5e1070d 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -416,6 +416,19 @@ error_codes = (
    "The user authorized on the connection '$0' does not match the session username '$1'"),
 
   ("ZSTD_ERROR", 137, "$0 failed with error: $1"),
+
+  ("LZ4_BLOCK_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT", 138,
+   "LZ4Block: Decompressed size is not correct."),
+
+  ("LZ4_BLOCK_DECOMPRESS_INVALID_INPUT_LENGTH", 139,
+   "LZ4Block: Invalid input length."),
+
+  ("LZ4_BLOCK_DECOMPRESS_INVALID_COMPRESSED_LENGTH", 140,
+   "LZ4Block: Invalid compressed length.  Data is likely corrupt."),
+
+  ("LZ4_DECOMPRESS_SAFE_FAILED", 141, "LZ4: LZ4_decompress_safe failed"),
+
+  ("LZ4_COMPRESS_DEFAULT_FAILED", 142, "LZ4: LZ4_compress_default failed"),
 )
 
 import sys
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index c2756bd..12c7989 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -94,7 +94,7 @@ Invalid query option: foo
 ---- QUERY
 set parquet_compression_codec=bar
 ---- CATCH
-Invalid compression codec: 'bar'. Valid values are NONE(0), DEFAULT(1), GZIP(2), DEFLATE(3), BZIP2(4), SNAPPY(5), SNAPPY_BLOCKED(6), LZO(7), LZ4(8), ZLIB(9), ZSTD(10), BROTLI(11).
+Invalid compression codec: 'bar'. Valid values are NONE(0), DEFAULT(1), GZIP(2), DEFLATE(3), BZIP2(4), SNAPPY(5), SNAPPY_BLOCKED(6), LZO(7), LZ4(8), ZLIB(9), ZSTD(10), BROTLI(11), LZ4_BLOCKED(12).
 ====
 ---- QUERY
 set explain_level=bar
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 43ea337..a962279 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -32,7 +32,7 @@ class TableFormatInfo(object):
   KNOWN_FILE_FORMATS = ['text', 'seq', 'rc', 'parquet', 'orc', 'avro', 'hbase']
   if os.environ['KUDU_IS_SUPPORTED'] == 'true':
     KNOWN_FILE_FORMATS.append('kudu')
-  KNOWN_COMPRESSION_CODECS = ['none', 'snap', 'gzip', 'bzip', 'def', 'lzo', 'zstd']
+  KNOWN_COMPRESSION_CODECS = ['none', 'snap', 'gzip', 'bzip', 'def', 'lzo', 'zstd', 'lz4']
   KNOWN_COMPRESSION_TYPES = ['none', 'block', 'record']
 
   def __init__(self, **kwargs):
diff --git a/tests/custom_cluster/test_hive_parquet_codec_interop.py b/tests/custom_cluster/test_hive_parquet_codec_interop.py
new file mode 100644
index 0000000..caf1ef3
--- /dev/null
+++ b/tests/custom_cluster/test_hive_parquet_codec_interop.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Tests for Hive-IMPALA parquet compression codec interoperability
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import IMPALA_TEST_CLUSTER_PROPERTIES
+from tests.common.test_dimensions import create_exec_option_dimension
+from tests.common.test_result_verifier import verify_query_result_is_equal
+from tests.util.filesystem_utils import get_fs_path
+
+PARQUET_CODECS = ['none', 'snappy', 'gzip', 'zstd', 'zstd:7', 'lz4']
+
+
+class TestParquetInterop(CustomClusterTestSuite):
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(CustomClusterTestSuite, cls).add_test_dimensions()
+    # Fix the exec_option vector to have a single value.
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
+        sync_ddl=[1]))
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("-convert_legacy_hive_parquet_utc_timestamps=true "
+      "-hdfs_zone_info_zip=%s" % get_fs_path("/test-warehouse/tzdb/2017c.zip"))
+  def test_hive_impala_interop(self, vector, unique_database):
+    # Setup source table.
+    source_table = "{0}.{1}".format(unique_database, "t1_source")
+    self.execute_query_expect_success(self.client,
+        "create table {0} as select * from functional_parquet.alltypes"
+        .format(source_table))
+    self.execute_query_expect_success(self.client,
+        "insert into {0}(id) values (7777), (8888), (9999), (11111), (22222), (33333)"
+        .format(source_table))
+
+    # Loop through the compression codecs and run interop tests.
+    for codec in PARQUET_CODECS:
+      # Write data in Impala.
+      vector.get_value('exec_option')['compression_codec'] = codec
+      impala_table = "{0}.{1}".format(unique_database, "t1_impala")
+      self.execute_query_expect_success(self.client,
+          "drop table if exists {0}".format(impala_table))
+      self.execute_query_expect_success(self.client,
+          "create table {0} stored as parquet as select * from {1}"
+          .format(impala_table, source_table), vector.get_value('exec_option'))
+
+      # Read data from Impala and write in Hive
+      if (codec == 'none'): codec = 'uncompressed'
+      elif (codec == 'zstd:7'): codec = 'zstd'
+      hive_table = "{0}.{1}".format(unique_database, "t1_hive")
+      self.run_stmt_in_hive("drop table if exists {0}".format(hive_table))
+      self.run_stmt_in_hive("set parquet.compression={0};\
+          create table {1} stored as parquet as select * from {2}"
+          .format(codec, hive_table, impala_table))
+
+      # Make sure Impala's metadata is in sync.
+      if IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster():
+        self.wait_for_table_to_appear(unique_database, hive_table, timeout_s=10)
+      else:
+        self.client.execute("invalidate metadata {0}".format(hive_table))
+
+      # Read Hive data in Impala and verify results.
+      base_result = self.execute_query_expect_success(self.client,
+          "select * from {0} order by id".format(source_table))
+      test_result = self.execute_query_expect_success(self.client,
+          "select * from {0} order by id".format(hive_table))
+      verify_query_result_is_equal(test_result.data, base_result.data)
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index 07634c5..dd2c4f9 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -35,7 +35,7 @@ from tests.common.test_result_verifier import (
 from tests.common.test_vector import ImpalaTestDimension
 from tests.verifiers.metric_verifier import MetricVerifier
 
-PARQUET_CODECS = ['none', 'snappy', 'gzip', 'zstd']
+PARQUET_CODECS = ['none', 'snappy', 'gzip', 'zstd', 'lz4']
 
 class TestInsertQueries(ImpalaTestSuite):
   @classmethod
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index b207c9b..f224c73 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -37,7 +37,7 @@ from tests.util.filesystem_utils import get_fs_path
 from tests.util.get_parquet_metadata import (decode_stats_value,
     get_parquet_metadata_from_hdfs_folder)
 
-PARQUET_CODECS = ['none', 'snappy', 'gzip', 'zstd']
+PARQUET_CODECS = ['none', 'snappy', 'gzip', 'zstd', 'lz4']
 
 
 class RoundFloat():