You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2016/08/05 22:46:39 UTC

incubator-impala git commit: IMPALA-3376: Extra definition level when writing Parquet files

Repository: incubator-impala
Updated Branches:
  refs/heads/master 834365618 -> 45d059855


IMPALA-3376: Extra definition level when writing Parquet files

Currently, when writing a new value to a parquet file, we write
the definition level before checking if there's enough space on
the current page for the value. If there isn't, we create a new
page and rewrite the definition level to it, but this leaves the
definition level for that value still written to the old page.

To fix this, we should make sure that we have enough space to write
both the definition level and the value before writing either.

This patch also modifies the parquet-reader tool, which reads
parquet files and performs minimal sanity checking on their
metadata, to check for extra definition levels, and adds a test
that runs the tool automatically.

Change-Id: I20f25a90aa1ef74b4f00f38f832bc1c1853342c6
Reviewed-on: http://gerrit.cloudera.org:8080/3835
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Internal Jenkins
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/45d05985
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/45d05985
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/45d05985

Branch: refs/heads/master
Commit: 45d059855b4afc2d5730c5e7ecaed7141993f354
Parents: 8343656
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue Aug 2 16:19:46 2016 -0700
Committer: Matthew Jacobs <mj...@cloudera.com>
Committed: Fri Aug 5 22:45:18 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-table-writer.cc | 39 ++++++++++-----
 be/src/util/parquet-reader.cc            | 69 +++++++++++++++++++++++++--
 be/src/util/rle-encoding.h               |  1 +
 tests/query_test/test_insert_parquet.py  | 44 +++++++++++++++++
 4 files changed, 137 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/45d05985/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index ce1bbb1..a0abdae 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -369,30 +369,35 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
   void* value = expr_ctx_->GetValue(row);
   if (current_page_ == NULL) NewPage();
 
-  // We might need to try again if this current page is not big enough
-  while (true) {
-    if (!def_levels_->Put(value != NULL)) {
-      FinalizeCurrentPage();
-      NewPage();
-      bool ret = def_levels_->Put(value != NULL);
-      DCHECK(ret);
-    }
+  // Ensure that we have enough space for the definition level, but don't write it yet in
+  // case we don't have enough space for the value.
+  if (def_levels_->buffer_full()) {
+    FinalizeCurrentPage();
+    NewPage();
+  }
 
+  // Encoding may fail for several reasons - because the current page is not big enough,
+  // because we've encoded the maximum number of unique dictionary values and need to
+  // switch to plain encoding, etc. so we may need to try again more than once.
+  // TODO: Have a clearer set of state transitions here, to make it easier to see that
+  // this won't loop forever.
+  while (true) {
     // Nulls don't get encoded.
     if (value == NULL) break;
-    ++current_page_->num_non_null;
 
     int64_t bytes_needed = 0;
-    if (EncodeValue(value, &bytes_needed)) break;
+    if (EncodeValue(value, &bytes_needed)) {
+      ++current_page_->num_non_null;
+      break;
+    }
 
     // Value didn't fit on page, try again on a new page.
     FinalizeCurrentPage();
 
-    // Check how much space it is needed to write this value. If that is larger than the
+    // Check how much space is needed to write this value. If that is larger than the
     // page size then increase page size and try again.
     if (UNLIKELY(bytes_needed > page_size_)) {
-      page_size_ = bytes_needed;
-      if (page_size_ > MAX_DATA_PAGE_SIZE) {
+      if (bytes_needed > MAX_DATA_PAGE_SIZE) {
         stringstream ss;
         ss << "Cannot write value of size "
            << PrettyPrinter::Print(bytes_needed, TUnit::BYTES) << " bytes to a Parquet "
@@ -400,11 +405,19 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
            << PrettyPrinter::Print(MAX_DATA_PAGE_SIZE , TUnit::BYTES) << ".";
         return Status(ss.str());
       }
+      page_size_ = bytes_needed;
       values_buffer_len_ = page_size_;
       values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
     }
     NewPage();
   }
+
+  // Now that the value has been successfully written, write the definition level.
+  bool ret = def_levels_->Put(value != NULL);
+  // Writing the def level will succeed because we ensured there was enough space for it
+  // above, and new pages will always have space for at least a single def level.
+  DCHECK(ret);
+
   ++current_page_->header.data_page_header.num_values;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/45d05985/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index e4bec3e..3c17110 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -32,6 +32,9 @@
 #include <thrift/transport/TBufferTransports.h>
 #pragma clang diagnostic pop
 
+#include "exec/parquet-common.h"
+#include "runtime/mem-pool.h"
+#include "util/codec.h"
 #include "util/rle-encoding.h"
 
 #include "common/names.h"
@@ -127,6 +130,67 @@ string GetSchema(const FileMetaData& md) {
   return ss.str();
 }
 
+// Inherit from RleDecoder to get access to repeat_count_, which is protected.
+class ParquetLevelReader : public impala::RleDecoder {
+ public:
+  ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width) :
+    RleDecoder(buffer, buffer_len, bit_width) {}
+
+  uint32_t repeat_count() const { return repeat_count_; }
+};
+
+// Performs sanity checking on the contents of data pages, to ensure that:
+//   - Compressed pages can be uncompressed successfully.
+//   - The number of def levels matches num_values in the page header when using RLE.
+//     Note that this will not catch every instance of Impala writing the wrong number of
+//     def levels - with our RLE scheme it is not possible to determine how many values
+//     were actually written if the final run is a literal run, only if the final run is
+//     a repeated run (see util/rle-encoding.h for more details).
+void CheckDataPage(const ColumnChunk& col, const PageHeader& header,
+    const uint8_t* page) {
+  const uint8_t* data = page;
+  std::vector<uint8_t> decompressed_buffer;
+  if (col.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
+    decompressed_buffer.resize(header.uncompressed_page_size);
+
+    boost::scoped_ptr<impala::Codec> decompressor;
+    impala::Codec::CreateDecompressor(NULL, false,
+        impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor);
+
+    uint8_t* buffer_ptr = decompressed_buffer.data();
+    int uncompressed_page_size = header.uncompressed_page_size;
+    impala::Status s = decompressor->ProcessBlock32(true, header.compressed_page_size,
+        data, &uncompressed_page_size, &buffer_ptr);
+    if (!s.ok()) {
+      cerr << "Error: Decompression failed: " << s.GetDetail() << " \n";
+      exit(1);
+    }
+
+    data = decompressed_buffer.data();
+  }
+
+  if (header.data_page_header.definition_level_encoding == parquet::Encoding::RLE) {
+    // Parquet data pages always start with the encoded definition level data, and
+    // RLE sections in Parquet always start with a 4 byte length followed by the data.
+    int num_def_level_bytes = *reinterpret_cast<const int32_t*>(data);
+    ParquetLevelReader def_levels(const_cast<uint8_t*>(data) + sizeof(int32_t),
+        num_def_level_bytes, sizeof(uint8_t));
+    uint8_t level;
+    for (int i = 0; i < header.data_page_header.num_values; ++i) {
+      if (!def_levels.Get(&level)) {
+        cerr << "Error: Decoding of def levels failed.\n";
+        exit(1);
+      }
+
+      if (i + def_levels.repeat_count() + 1 > header.data_page_header.num_values) {
+        cerr << "Error: More def levels encoded (" << (i + def_levels.repeat_count() + 1)
+             << ") than num_values (" << header.data_page_header.num_values << ").\n";
+        exit(1);
+      }
+    }
+  }
+}
+
 // Simple utility to read parquet files on local disk.  This utility validates the
 // file is correctly formed and can output values from each data page.  The
 // entire file is buffered in memory so this is not suitable for very large files.
@@ -183,9 +247,6 @@ int main(int argc, char** argv) {
   int total_uncompressed_data_size = 0;
   vector<int> column_sizes;
 
-  // Buffer to decompress data into.  Reused across pages.
-  vector<char> decompression_buffer;
-
   for (int i = 0; i < file_metadata.row_groups.size(); ++i) {
     cerr << "Reading row group " << i << endl;
     RowGroup& rg = file_metadata.row_groups[i];
@@ -214,6 +275,8 @@ int main(int argc, char** argv) {
         }
 
         data += header_size;
+        if (header.__isset.data_page_header) CheckDataPage(col, header, data);
+
         total_page_header_size += header_size;
         column_sizes[c] += header.compressed_page_size;
         total_compressed_data_size += header.compressed_page_size;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/45d05985/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 030767d..bebb6d9 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -176,6 +176,7 @@ class RleEncoder {
   /// Returns pointer to underlying buffer
   uint8_t* buffer() { return bit_writer_.buffer(); }
   int32_t len() { return bit_writer_.bytes_written(); }
+  bool buffer_full() const { return buffer_full_; }
 
  private:
   /// Flushes any buffered values.  If this is part of a repeated run, this is largely

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/45d05985/tests/query_test/test_insert_parquet.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py
index df09bab..101fbc5 100644
--- a/tests/query_test/test_insert_parquet.py
+++ b/tests/query_test/test_insert_parquet.py
@@ -1,8 +1,14 @@
 # Copyright (c) 2012 Cloudera, Inc. All rights reserved.
 # Targeted Impala insert tests
 
+import os
 import pytest
 
+from shutil import rmtree
+from subprocess import check_call
+from tempfile import mkdtemp as make_tmp_dir
+
+from tests.common.environ import impalad_basedir
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_exec_option_dimension
@@ -138,3 +144,41 @@ class TestInsertParquetVerifySize(ImpalaTestSuite):
       if size < BLOCK_SIZE * 0.80:
         assert found_small_file == False
         found_small_file = True
+
+class TestHdfsParquetTableWriter(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestHdfsParquetTableWriter, cls).add_test_dimensions()
+    cls.TestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def test_def_level_encoding(self, vector, unique_database):
+    """IMPALA-3376: Tests that parquet files are written to HDFS correctly by generating a
+    parquet table and running the parquet-reader tool on it, which performs sanity
+    checking, such as that the correct number of definition levels were encoded.
+    """
+    table_name = "test_hdfs_parquet_table_writer"
+    qualified_table_name = "%s.%s" % (unique_database, table_name)
+    self.execute_query("drop table if exists %s" % qualified_table_name)
+    self.execute_query("create table %s stored as parquet as select l_linenumber from "
+        "tpch_parquet.lineitem limit 180000" % qualified_table_name)
+
+    tmp_dir = make_tmp_dir()
+    try:
+      hdfs_file = get_fs_path('/test-warehouse/%s.db/%s/*.parq'
+          % (unique_database, table_name))
+      check_call(['hdfs', 'dfs', '-copyToLocal', hdfs_file, tmp_dir])
+
+      for root, subdirs, files in os.walk(tmp_dir):
+        for f in files:
+          if not f.endswith('parq'):
+            continue
+          check_call([os.path.join(impalad_basedir, 'util/parquet-reader'), '--file',
+              os.path.join(tmp_dir, str(f))])
+    finally:
+      self.execute_query("drop table %s" % qualified_table_name)
+      rmtree(tmp_dir)