You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/08/11 22:04:22 UTC

[1/2] incubator-impala git commit: IMPALA-3764, 3914: fuzz test HDFS scanners and fix parquet bugs found

Repository: incubator-impala
Updated Branches:
  refs/heads/master 286da5921 -> 9162d5d05


IMPALA-3764,3914: fuzz test HDFS scanners and fix parquet bugs found

This adds a test that performs some simple fuzz testing of HDFS
scanners. It creates a copy of a given HDFS table, with each
file in the table corrupted in a random way: either a single
byte is set to a random value, or the file is truncated to a
random length. It then runs a query that scans the whole table
with several different batch_size settings. I made some effort
to make the failures reproducible by explicitly seeding the
random number generator, and providing a mechanism to override
the seed.

The fuzzer has found crashes resulting from corrupted or truncated
input files for RCFile, SequenceFile, Parquet, and Text LZO so far.
Avro only had a small buffer read overrun detected by ASAN.

Includes fixes for Parquet crashes found by the fuzzer, a small
buffer overrun in Avro, and a DCHECK in MemPool.

Initially it is only enabled for Avro, Parquet, and uncompressed
text. As follow-up work we should fix the bugs in the other scanners
and enable the test for them.

We also don't implement abort_on_error=0 correctly in Parquet:
for some file formats, corrupt headers result in the query being
aborted, so an exception will xfail the test.

Testing:
Ran the test with exploration_strategy=exhaustive in a loop locally
with both DEBUG and ASAN builds for a couple of days over a weekend.
Also ran exhaustive private build.

Change-Id: I50cf43195a7c582caa02c85ae400ea2256fa3a3b
Reviewed-on: http://gerrit.cloudera.org:8080/3833
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5afd9f7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5afd9f7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5afd9f7d

Branch: refs/heads/master
Commit: 5afd9f7df765006c067ef5f57d7f7431fe9e1247
Parents: 286da59
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Aug 2 11:02:02 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Aug 11 08:42:41 2016 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc            |   5 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  28 ++-
 be/src/exec/parquet-column-readers.cc           |  36 +++-
 be/src/exec/parquet-column-readers.h            |   4 +-
 be/src/exec/parquet-metadata-utils.cc           |  47 +++--
 be/src/exec/parquet-metadata-utils.h            |   9 +
 be/src/runtime/disk-io-mgr.cc                   |  11 +-
 be/src/runtime/scoped-buffer.h                  |  68 +++++++
 be/src/util/bit-stream-utils.h                  |   9 +-
 be/src/util/bit-stream-utils.inline.h           |   7 +-
 be/src/util/compress.cc                         |   5 +
 be/src/util/dict-encoding.h                     |  15 +-
 be/src/util/dict-test.cc                        |   3 +-
 be/src/util/rle-encoding.h                      |  10 +-
 be/src/util/rle-test.cc                         |   2 +-
 .../queries/QueryTest/parquet.test              |   2 +-
 tests/common/impala_test_suite.py               |  10 +
 tests/query_test/test_scanners.py               |   9 -
 tests/query_test/test_scanners_fuzz.py          | 203 +++++++++++++++++++
 19 files changed, 427 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 5ac7954..5429e04 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -200,7 +200,10 @@ Status BaseSequenceScanner::ReadSync() {
   uint8_t* hash;
   int64_t out_len;
   RETURN_IF_FALSE(stream_->GetBytes(SYNC_HASH_SIZE, &hash, &out_len, &parse_status_));
-  if (out_len != SYNC_HASH_SIZE || memcmp(hash, header_->sync, SYNC_HASH_SIZE)) {
+  if (out_len != SYNC_HASH_SIZE) {
+    return Status(Substitute("Hit end of stream after reading $0 bytes of $1-byte "
+        "synchronization marker", out_len, SYNC_HASH_SIZE));
+  } else if (memcmp(hash, header_->sync, SYNC_HASH_SIZE) != 0) {
     stringstream ss;
     ss  << "Bad synchronization marker" << endl
         << "  Expected: '"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 5582267..b0fd008 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -35,6 +35,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
+#include "runtime/scoped-buffer.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "runtime/string-value.h"
@@ -78,6 +79,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNode* scan_node,
     // Compute the offset of the file footer.
     int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length);
     int64_t footer_start = files[i]->file_length - footer_size;
+    DCHECK_GE(footer_start, 0);
 
     // Try to find the split with the footer.
     DiskIoMgr::ScanRange* footer_split = FindFooterSplit(files[i]);
@@ -311,6 +313,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) {
       return Status::OK();
     }
     assemble_rows_timer_.Start();
+    DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
     int rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
     int max_tuples = min(row_batch->capacity(), rows_remaining);
     TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
@@ -496,7 +499,14 @@ Status HdfsParquetScanner::AssembleRows(
         return Status::OK();
       }
       // Check that all column readers populated the same number of values.
-      if (c != 0) DCHECK_EQ(last_num_tuples, scratch_batch_->num_tuples);
+      if (c != 0 && UNLIKELY(last_num_tuples != scratch_batch_->num_tuples)) {
+        parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' "
+            "had $2 remaining values but expected $3", filename(),
+            col_reader->schema_element().name, last_num_tuples,
+            scratch_batch_->num_tuples));
+        *skip_row_group = true;
+        return Status::OK();
+      }
       last_num_tuples = scratch_batch_->num_tuples;
     }
     row_group_rows_read_ += scratch_batch_->num_tuples;
@@ -788,7 +798,7 @@ Status HdfsParquetScanner::ProcessFooter() {
   uint8_t* metadata_ptr = metadata_size_ptr - metadata_size;
   // If the metadata was too big, we need to stitch it before deserializing it.
   // In that case, we stitch the data in this buffer.
-  vector<uint8_t> metadata_buffer;
+  ScopedBuffer metadata_buffer(scan_node_->mem_tracker());
 
   DCHECK(metadata_range_ != NULL);
   if (UNLIKELY(metadata_size > remaining_bytes_buffered)) {
@@ -803,7 +813,7 @@ Status HdfsParquetScanner::ProcessFooter() {
       sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER) - metadata_size;
     int64_t metadata_bytes_to_read = metadata_size;
     if (metadata_start < 0) {
-      return Status(Substitute("File $0 is invalid. Invalid metadata size in file "
+      return Status(Substitute("File '$0' is invalid. Invalid metadata size in file "
           "footer: $1 bytes. File size: $2 bytes.", filename(), metadata_size,
           file_desc->file_length));
     }
@@ -812,8 +822,12 @@ Status HdfsParquetScanner::ProcessFooter() {
     // TODO: consider moving this stitching into the scanner context. The scanner
     // context usually handles the stitching but no other scanner need this logic
     // now.
-    metadata_buffer.resize(metadata_size);
-    metadata_ptr = &metadata_buffer[0];
+
+    if (!metadata_buffer.TryAllocate(metadata_size)) {
+      return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet "
+          "metadata for file '$1'.", metadata_size, filename()));
+    }
+    metadata_ptr = metadata_buffer.buffer();
     int64_t copy_offset = 0;
     DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
 
@@ -856,6 +870,10 @@ Status HdfsParquetScanner::ProcessFooter() {
     return Status(
         Substitute("Invalid file. This file: $0 has no row groups", filename()));
   }
+  if (file_metadata_.num_rows < 0) {
+    return Status(Substitute("Corrupt Parquet file '$0': negative row count $1 in "
+        "file metadata", filename(), file_metadata_.num_rows));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 3e8f33c..c7e3e17 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -62,6 +62,7 @@ const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to al
 Status ParquetLevelDecoder::Init(const string& filename,
     parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
     int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
+  DCHECK_GE(num_buffered_values, 0);
   encoding_ = encoding;
   max_level_ = max_level;
   num_buffered_values_ = num_buffered_values;
@@ -95,7 +96,10 @@ Status ParquetLevelDecoder::Init(const string& filename,
       return Status(ss.str());
     }
   }
-  DCHECK_GT(num_bytes, 0);
+  if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) {
+    return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded levels but "
+        "only $2 bytes left in page", filename, num_bytes, data_size));
+  }
   *data += num_bytes;
   *data_size -= num_bytes;
   return Status::OK();
@@ -404,6 +408,8 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   }
 
   virtual Status InitDataPage(uint8_t* data, int size) {
+    // Data can be empty if the column contains all NULLs
+    DCHECK_GE(size, 0);
     page_encoding_ = current_page_header_.data_page_header.encoding;
     if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
         page_encoding_ != parquet::Encoding::PLAIN) {
@@ -419,7 +425,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
       if (!dict_decoder_init_) {
         return Status("File corrupt. Missing dictionary page.");
       }
-      dict_decoder_.SetData(data, size);
+      RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
     }
 
     // TODO: Perform filter selectivity checks here.
@@ -757,6 +763,15 @@ Status BaseScalarColumnReader::ReadDataPage() {
 
     int data_size = current_page_header_.compressed_page_size;
     int uncompressed_size = current_page_header_.uncompressed_page_size;
+    if (UNLIKELY(data_size < 0)) {
+      return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for "
+          "column '$2'", filename(), data_size, schema_element().name));
+    }
+    if (UNLIKELY(uncompressed_size < 0)) {
+      return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page "
+          "size $1 for column '$2'", filename(), uncompressed_size,
+          schema_element().name));
+    }
 
     if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
       if (slot_desc_ == NULL) {
@@ -853,7 +868,12 @@ Status BaseScalarColumnReader::ReadDataPage() {
     // statistics. See IMPALA-2208 and PARQUET-251.
     if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
     data_end_ = data_ + data_size;
-    num_buffered_values_ = current_page_header_.data_page_header.num_values;
+    int num_values = current_page_header_.data_page_header.num_values;
+    if (num_values < 0) {
+      return Status(Substitute("Error reading data page in Parquet file '$0'. "
+          "Invalid number of values in metadata: $1", filename(), num_values));
+    }
+    num_buffered_values_ = num_values;
     num_values_read_ += num_buffered_values_;
 
     if (decompressor_.get() != NULL) {
@@ -902,7 +922,7 @@ Status BaseScalarColumnReader::ReadDataPage() {
         max_def_level(), num_buffered_values_, &data_, &data_size));
 
     // Data can be empty if the column contains all NULLs
-    if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size));
+    RETURN_IF_ERROR(InitDataPage(data_, data_size));
     break;
   }
 
@@ -920,6 +940,14 @@ bool BaseScalarColumnReader::NextLevels() {
 
   // Definition level is not present if column and any containing structs are required.
   def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
+  // The compiler can optimize these two conditions into a single branch by treating
+  // def_level_ as unsigned.
+  if (UNLIKELY(def_level_ < 0 || def_level_ > max_def_level())) {
+    parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': "
+        "invalid def level $1 > max def level $2 for column '$3'", filename(),
+        def_level_, max_def_level(), schema_element().name)));
+    return false;
+  }
 
   if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
     // Repetition level is only present if this column is nested in any collection type.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 930e6bb..8435e71 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -428,7 +428,9 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   virtual void ClearDictionaryDecoder() = 0;
 
   /// Initializes the reader with the data contents. This is the content for the entire
-  /// decompressed data page. Decoders can initialize state from here.
+  /// decompressed data page. Decoders can initialize state from here. The caller must
+  /// validate the input such that 'size' is non-negative and that 'data' has at least
+  /// 'size' bytes remaining.
   virtual Status InitDataPage(uint8_t* data, int size) = 0;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc
index 52ae933..1b694ed 100644
--- a/be/src/exec/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -56,32 +56,41 @@ Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename,
     int64_t file_length, const parquet::RowGroup& row_group) {
   for (int i = 0; i < row_group.columns.size(); ++i) {
     const parquet::ColumnChunk& col_chunk = row_group.columns[i];
+    RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length,
+        col_chunk.meta_data.data_page_offset, "data page offset"));
     int64_t col_start = col_chunk.meta_data.data_page_offset;
     // The file format requires that if a dictionary page exists, it be before data pages.
     if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+      RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length,
+            col_chunk.meta_data.dictionary_page_offset, "dictionary page offset"));
       if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
-        stringstream ss;
-        ss << "File " << filename << ": metadata is corrupt. "
-            << "Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset
-            << ") must come before any data pages (offset=" << col_start << ").";
-        return Status(ss.str());
+        return Status(Substitute("Parquet file '$0': metadata is corrupt. Dictionary "
+            "page (offset=$1) must come before any data pages (offset=$2).",
+            filename, col_chunk.meta_data.dictionary_page_offset, col_start));
       }
       col_start = col_chunk.meta_data.dictionary_page_offset;
     }
     int64_t col_len = col_chunk.meta_data.total_compressed_size;
     int64_t col_end = col_start + col_len;
     if (col_end <= 0 || col_end > file_length) {
-      stringstream ss;
-      ss << "File " << filename << ": metadata is corrupt. "
-          << "Column " << i << " has invalid column offsets "
-          << "(offset=" << col_start << ", size=" << col_len << ", "
-          << "file_size=" << file_length << ").";
-      return Status(ss.str());
+      return Status(Substitute("Parquet file '$0': metadata is corrupt. Column $1 has "
+          "invalid column offsets (offset=$2, size=$3, file_size=$4).", filename, i,
+          col_start, col_len, file_length));
     }
   }
   return Status::OK();
 }
 
+Status ParquetMetadataUtils::ValidateOffsetInFile(const string& filename, int col_idx,
+    int64_t file_length, int64_t offset, const string& offset_name) {
+  if (offset < 0 || offset >= file_length) {
+    return Status(Substitute("File '$0': metadata is corrupt. Column $1 has invalid "
+        "$2 (offset=$3 file_size=$4).", filename, col_idx, offset_name, offset,
+        file_length));
+  }
+  return Status::OK();;
+}
+
 static bool IsEncodingSupported(parquet::Encoding::type e) {
   switch (e) {
     case parquet::Encoding::PLAIN:
@@ -128,8 +137,10 @@ Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_me
   if (slot_desc == NULL) return Status::OK();
 
   parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type];
-  DCHECK_EQ(type, file_data.meta_data.type)
-      << "Should have been validated in ResolvePath()";
+  if (UNLIKELY(type != file_data.meta_data.type)) {
+    return Status(Substitute("Unexpected Parquet type in file '$0' metadata expected $1 "
+        "actual $2: file may be corrupt", filename, type, file_data.meta_data.type));
+  }
 
   // Check the decimal scale in the file matches the metastore scale and precision.
   // We fail the query if the metadata makes it impossible for us to safely read
@@ -318,7 +329,7 @@ Status ParquetSchemaResolver::CreateSchemaTree(
     int ira_def_level, int* idx, int* col_idx, SchemaNode* node)
     const {
   if (*idx >= schema.size()) {
-    return Status(Substitute("File $0 corrupt: could not reconstruct schema tree from "
+    return Status(Substitute("File '$0' corrupt: could not reconstruct schema tree from "
             "flattened schema in file metadata", filename_));
   }
   node->element = &schema[*idx];
@@ -329,6 +340,14 @@ Status ParquetSchemaResolver::CreateSchemaTree(
     // file_metadata_.row_groups.columns
     node->col_idx = *col_idx;
     ++(*col_idx);
+  } else if (node->element->num_children > SCHEMA_NODE_CHILDREN_SANITY_LIMIT) {
+    // Sanity-check the schema to avoid allocating absurdly large buffers below.
+    return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than limit of "
+        "$2. File is likely corrupt", filename_, node->element->num_children,
+        SCHEMA_NODE_CHILDREN_SANITY_LIMIT));
+  } else if (node->element->num_children < 0) {
+    return Status(Substitute("Corrupt Parquet file '$0': schema element has $1 children.",
+        filename_, node->element->num_children));
   }
 
   // def_level_of_immediate_repeated_ancestor does not include this node, so set before

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-metadata-utils.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.h b/be/src/exec/parquet-metadata-utils.h
index 7e1db31..7a1e897 100644
--- a/be/src/exec/parquet-metadata-utils.h
+++ b/be/src/exec/parquet-metadata-utils.h
@@ -39,6 +39,11 @@ class ParquetMetadataUtils {
   static Status ValidateColumnOffsets(const string& filename, int64_t file_length,
       const parquet::RowGroup& row_group);
 
+  /// Check that a file offset is in the file. Return an error status with a detailed
+  /// error message if it is not.
+  static Status ValidateOffsetInFile(const std::string& filename, int col_idx,
+      int64_t file_length, int64_t offset, const std::string& offset_name);
+
   /// Validates the column metadata to make sure this column is supported (e.g. encoding,
   /// type, etc) and matches the type of given slot_desc.
   static Status ValidateColumn(const parquet::FileMetaData& file_metadata,
@@ -144,6 +149,10 @@ class ParquetSchemaResolver {
       bool* missing_field) const;
 
  private:
+  /// An arbitrary limit on the number of children per schema node we support.
+  /// Used to sanity-check Parquet schemas.
+  static const int SCHEMA_NODE_CHILDREN_SANITY_LIMIT = 64 * 1024;
+
   /// Unflattens the schema metadata from a Parquet file metadata and converts it to our
   /// SchemaNode representation. Returns the result in 'node' unless an error status is
   /// returned. Does not set the slot_desc field of any SchemaNode.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 88ea035..5df69ed 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -519,10 +519,13 @@ int64_t DiskIoMgr::GetReadThroughput() {
 Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
   int disk_id = range->disk_id_;
   if (disk_id < 0 || disk_id >= disk_queues_.size()) {
-    stringstream ss;
-    ss << "Invalid scan range.  Bad disk id: " << disk_id;
-    DCHECK(false) << ss.str();
-    return Status(ss.str());
+    return Status(Substitute("Invalid scan range.  Bad disk id: $0", disk_id));
+  }
+  if (range->offset_ < 0) {
+    return Status(Substitute("Invalid scan range. Negative offset $0", range->offset_));
+  }
+  if (range->len_ < 0) {
+    return Status(Substitute("Invalid scan range. Negative length $0", range->len_));
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/runtime/scoped-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/scoped-buffer.h b/be/src/runtime/scoped-buffer.h
new file mode 100644
index 0000000..4841f7f
--- /dev/null
+++ b/be/src/runtime/scoped-buffer.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_SCOPED_BUFFER_H
+#define IMPALA_RUNTIME_SCOPED_BUFFER_H
+
+#include "runtime/mem-tracker.h"
+
+namespace {
+
+/// A scoped memory allocation that is tracked against a MemTracker.
+/// The allocation is automatically freed when the ScopedBuffer object goes out of scope.
+class ScopedBuffer {
+ public:
+  ScopedBuffer(MemTracker* mem_tracker) : mem_tracker_(mem_tracker),
+      buffer_(NULL), bytes_(0) {}
+  ~ScopedBuffer() { Release(); }
+
+  /// Try to allocate a buffer of size 'bytes'. Returns false if MemTracker::TryConsume()
+  /// or malloc() fails.
+  /// Should not be called if a buffer is already allocated.
+  bool TryAllocate(int64_t bytes) {
+    DCHECK(buffer_ == NULL);
+    DCHECK_GT(bytes, 0);
+    if (!mem_tracker_->TryConsume(bytes)) return false;
+    buffer_ = reinterpret_cast<uint8_t*>(malloc(bytes));
+    if (UNLIKELY(buffer_ == NULL)) {
+      mem_tracker_->Release(bytes);
+      return false;
+    }
+    bytes_ = bytes;
+    return true;
+  }
+
+  void Release() {
+    if (buffer_ == NULL) return;
+    free(buffer_);
+    buffer_ = NULL;
+    mem_tracker_->Release(bytes_);
+    bytes_ = 0;
+  }
+
+  inline uint8_t* buffer() const { return buffer_; }
+
+ private:
+  MemTracker* mem_tracker_;
+  uint8_t* buffer_;
+  /// The current size of the allocated buffer, if not NULL.
+  int64_t bytes_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index 48ced18..ce159cb 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -77,6 +77,9 @@ class BitWriter {
   /// to the next byte boundary.
   void Flush(bool align=false);
 
+  /// Maximum supported bitwidth for writer.
+  static const int MAX_BITWIDTH = 32;
+
  private:
   uint8_t* buffer_;
   int max_bytes_;
@@ -123,7 +126,8 @@ class BitReader {
   bool GetAligned(int num_bytes, T* v);
 
   /// Reads a vlq encoded int from the stream.  The encoded int must start at the
-  /// beginning of a byte. Return false if there were not enough bytes in the buffer.
+  /// beginning of a byte. Return false if there were not enough bytes in the buffer or
+  /// the int is invalid.
   bool GetVlqInt(int32_t* v);
 
   /// Returns the number of bytes left in the stream, not including the current byte (i.e.,
@@ -133,6 +137,9 @@ class BitReader {
   /// Maximum byte length of a vlq encoded int
   static const int MAX_VLQ_BYTE_LEN = 5;
 
+  /// Maximum supported bitwidth for reader.
+  static const int MAX_BITWIDTH = 32;
+
  private:
   uint8_t* buffer_;
   int max_bytes_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h
index 4249bc5..fd77974 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -25,7 +25,7 @@ namespace impala {
 
 inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
   // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases)
-  DCHECK_LE(num_bits, 32);
+  DCHECK_LE(num_bits, MAX_BITWIDTH);
   DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits;
 
   if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false;
@@ -88,7 +88,7 @@ template<typename T>
 inline bool BitReader::GetValue(int num_bits, T* v) {
   DCHECK(buffer_ != NULL);
   // TODO: revisit this limit if necessary
-  DCHECK_LE(num_bits, 32);
+  DCHECK_LE(num_bits, MAX_BITWIDTH);
   DCHECK_LE(num_bits, sizeof(T) * 8);
 
   if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false;
@@ -140,13 +140,12 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) {
 inline bool BitReader::GetVlqInt(int32_t* v) {
   *v = 0;
   int shift = 0;
-  int num_bytes = 0;
   uint8_t byte = 0;
   do {
+    if (UNLIKELY(shift >= MAX_VLQ_BYTE_LEN * 7)) return false;
     if (!GetAligned<uint8_t>(1, &byte)) return false;
     *v |= (byte & 0x7F) << shift;
     shift += 7;
-    DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
   } while ((byte & 0x80) != 0);
   return true;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/compress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index 1676a50..7c97737 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -114,6 +114,7 @@ Status GzipCompressor::Compress(int64_t input_length, const uint8_t* input,
 Status GzipCompressor::ProcessBlock(bool output_preallocated,
     int64_t input_length, const uint8_t* input, int64_t* output_length,
     uint8_t** output) {
+  DCHECK_GE(input_length, 0);
   DCHECK(!output_preallocated || (output_preallocated && *output_length > 0));
   int64_t max_compressed_len = MaxOutputLen(input_length);
   if (!output_preallocated) {
@@ -146,6 +147,7 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_leng
   // The bz2 library does not allow input to be NULL, even when input_length is 0. This
   // should be OK because we do not write any file formats that support bzip compression.
   DCHECK(input != NULL);
+  DCHECK_GE(input_length, 0);
 
   if (output_preallocated) {
     buffer_length_ = *output_length;
@@ -201,6 +203,7 @@ int64_t SnappyBlockCompressor::MaxOutputLen(int64_t input_len, const uint8_t* in
 Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
     int64_t input_length, const uint8_t* input, int64_t *output_length,
     uint8_t** output) {
+  DCHECK_GE(input_length, 0);
   // Hadoop uses a block compression scheme on top of snappy.  First there is
   // an integer which is the size of the decompressed data followed by a
   // sequence of compressed blocks each preceded with an integer size.
@@ -252,6 +255,7 @@ int64_t SnappyCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input)
 
 Status SnappyCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
+  DCHECK_GE(input_length, 0);
   int64_t max_compressed_len = MaxOutputLen(input_length);
   if (output_preallocated && *output_length < max_compressed_len) {
     return Status("SnappyCompressor::ProcessBlock: output length too small");
@@ -292,6 +296,7 @@ int64_t Lz4Compressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
 
 Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
+  DCHECK_GE(input_length, 0);
   CHECK(output_preallocated) << "Output was not allocated for Lz4 Codec";
   if (input_length == 0) return Status::OK();
   *output_length = LZ4_compress(reinterpret_cast<const char*>(input),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 09a3d2d..d9fbe08 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -23,6 +23,7 @@
 #include <boost/unordered_map.hpp>
 
 #include "gutil/bits.h"
+#include "gutil/strings/substitute.h"
 #include "exec/parquet-common.h"
 #include "runtime/mem-pool.h"
 #include "runtime/string-value.h"
@@ -166,14 +167,20 @@ class DictEncoder : public DictEncoderBase {
 /// by the caller and valid as long as this object is.
 class DictDecoderBase {
  public:
-  /// The rle encoded indices into the dictionary.
-  void SetData(uint8_t* buffer, int buffer_len) {
-    DCHECK_GT(buffer_len, 0);
+  /// The rle encoded indices into the dictionary. Returns an error status if the buffer
+  /// is too short or the bit_width metadata in the buffer is invalid.
+  Status SetData(uint8_t* buffer, int buffer_len) {
+    DCHECK_GE(buffer_len, 0);
+    if (UNLIKELY(buffer_len == 0)) return Status("Dictionary cannot be 0 bytes");
     uint8_t bit_width = *buffer;
-    DCHECK_GE(bit_width, 0);
+    if (UNLIKELY(bit_width < 0 || bit_width > BitReader::MAX_BITWIDTH)) {
+      return Status(strings::Substitute("Dictionary has invalid or unsupported bit "
+          "width: $0", bit_width));
+    }
     ++buffer;
     --buffer_len;
     data_decoder_.Reset(buffer, buffer_len, bit_width);
+    return Status::OK();
   }
 
   virtual ~DictDecoderBase() {}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index ea6536c..10d5e3d 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -25,6 +25,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
+#include "testutil/test-macros.h"
 #include "util/dict-encoding.h"
 
 #include "common/names.h"
@@ -53,7 +54,7 @@ void ValidateDict(const vector<T>& values, int fixed_buffer_byte_size) {
   DictDecoder<T> decoder;
   ASSERT_TRUE(
       decoder.Reset(dict_buffer, encoder.dict_encoded_size(), fixed_buffer_byte_size));
-  decoder.SetData(data_buffer, data_len);
+  ASSERT_OK(decoder.SetData(data_buffer, data_len));
   for (T i: values) {
     T j;
     decoder.GetValue(&j);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 6f993d5..9f39697 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -88,14 +88,14 @@ class RleDecoder {
       repeat_count_(0),
       literal_count_(0) {
     DCHECK_GE(bit_width_, 0);
-    DCHECK_LE(bit_width_, 64);
+    DCHECK_LE(bit_width_, BitReader::MAX_BITWIDTH);
   }
 
   RleDecoder() : bit_width_(-1) {}
 
   void Reset(uint8_t* buffer, int buffer_len, int bit_width) {
     DCHECK_GE(bit_width, 0);
-    DCHECK_LE(bit_width, 64);
+    DCHECK_LE(bit_width, BitReader::MAX_BITWIDTH);
     bit_reader_.Reset(buffer, buffer_len);
     bit_width_ = bit_width;
     current_value_ = 0;
@@ -262,8 +262,7 @@ inline bool RleDecoder::Get(T* val) {
     --repeat_count_;
   } else {
     DCHECK_GT(literal_count_, 0);
-    bool result = bit_reader_.GetValue(bit_width_, val);
-    DCHECK(result);
+    if (UNLIKELY(!bit_reader_.GetValue(bit_width_, val))) return false;
     --literal_count_;
   }
 
@@ -275,8 +274,7 @@ bool RleDecoder::NextCounts() {
   // Read the next run's indicator int, it could be a literal or repeated run.
   // The int is encoded as a vlq-encoded value.
   int32_t indicator_value = 0;
-  bool result = bit_reader_.GetVlqInt(&indicator_value);
-  if (!result) return false;
+  if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return false;
 
   // lsb indicates if it is a literal run or repeated run
   bool is_literal = indicator_value & 1;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc
index b3dc5b7..fd429eb 100644
--- a/be/src/util/rle-test.cc
+++ b/be/src/util/rle-test.cc
@@ -31,7 +31,7 @@
 
 namespace impala {
 
-const int MAX_WIDTH = 32;
+const int MAX_WIDTH = BitReader::MAX_BITWIDTH;
 
 TEST(BitArray, TestBool) {
   const int len = 8;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/testdata/workloads/functional-query/queries/QueryTest/parquet.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet.test b/testdata/workloads/functional-query/queries/QueryTest/parquet.test
index e6b4061..a449162 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet.test
@@ -28,7 +28,7 @@ Invalid metadata size in file footer
 # Parquet file with invalid column dict_page_offset.
 SELECT * from bad_dict_page_offset
 ---- CATCH
-Column 0 has invalid column offsets (offset=10000, size=47, file_size=249)
+Column 0 has invalid data page offset (offset=100001 file_size=249)
 ====
 ---- QUERY
 # Parquet file with invalid column total_compressed_size.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 8863995..4fcce64 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -512,6 +512,16 @@ class ImpalaTestSuite(BaseTestSuite):
     self.hive_client.drop_table(db_name, table_name, True)
     self.hive_client.create_table(table)
 
+  def _get_table_location(self, table_name, vector):
+    """ Returns the HDFS location of the table """
+    result = self.execute_query_using_client(self.client,
+        "describe formatted %s" % table_name, vector)
+    for row in result.data:
+      if 'Location:' in row:
+        return row.split('\t')[1]
+    # This should never happen.
+    assert 0, 'Unable to get location for table: ' + table_name
+
   def run_stmt_in_hive(self, stmt):
     """
     Run a statement in Hive, returning stdout if successful and throwing

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 8e78670..4cfbcb0 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -117,15 +117,6 @@ class TestUnmatchedSchema(ImpalaTestSuite):
     cls.TestMatrix.add_constraint(\
         lambda v: v.get_value('table_format').file_format != 'avro')
 
-  def _get_table_location(self, table_name, vector):
-    result = self.execute_query_using_client(self.client,
-        "describe formatted %s" % table_name, vector)
-    for row in result.data:
-      if 'Location:' in row:
-        return row.split('\t')[1]
-    # This should never happen.
-    assert 0, 'Unable to get location for table: ' + table_name
-
   def _create_test_table(self, vector):
     """
     Creates the test table

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
new file mode 100644
index 0000000..ae17572
--- /dev/null
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+import random
+import shutil
+import tempfile
+import time
+from subprocess import check_call
+from tests.common.impala_test_suite import ImpalaTestSuite, LOG
+from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
+
+# Random fuzz testing of HDFS scanners. Existing tables for any HDFS file format
+# are corrupted in random ways to flush out bugs with handling of corrupted data.
+class TestScannersFuzzing(ImpalaTestSuite):
+  # Test a range of batch sizes to exercise different corner cases.
+  BATCH_SIZES = [0, 1, 16, 10000]
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestScannersFuzzing, cls).add_test_dimensions()
+    # TODO: enable for more table formats once they consistently pass the fuzz test.
+    cls.TestMatrix.add_constraint(lambda v:\
+        v.get_value('table_format').file_format in ('avro', 'parquet') or
+        (v.get_value('table_format').file_format == 'text'
+            and v.get_value('table_format').compression_type == 'none'))
+
+  def test_fuzz_alltypes(self, vector, unique_database):
+    self.run_fuzz_test(vector, unique_database, "alltypes")
+
+  def test_fuzz_decimal_tbl(self, vector, unique_database):
+    table_format = vector.get_value('table_format')
+    table_name = "decimal_tbl"
+    if table_format.file_format in ('avro'):
+      table_name = "avro_decimal_tbl"
+      if table_format.compression_codec != 'block' or \
+          table_format.compression_type != 'snap':
+        pytest.skip()
+
+    self.run_fuzz_test(vector, unique_database, table_name, 10)
+
+  def test_fuzz_nested_types(self, vector, unique_database):
+    table_format = vector.get_value('table_format')
+    if table_format.file_format != 'parquet': pytest.skip()
+    self.run_fuzz_test(vector, unique_database, "complextypestbl", 10)
+
+  # TODO: add test coverage for additional data types like char and varchar
+
+  def run_fuzz_test(self, vector, unique_database, table, num_copies=1):
+    """ Do some basic fuzz testing: create a copy of an existing table with randomly
+    corrupted files and make sure that we don't crash or behave in an unexpected way.
+    'unique_database' is used for the table, so it will be cleaned up automatically.
+    If 'num_copies' is set, create that many corrupted copies of each input file.
+    SCANNER_FUZZ_SEED can be set in the environment to reproduce the result (assuming that
+    input files are the same).
+    SCANNER_FUZZ_KEEP_FILES can be set in the environment to keep the generated files.
+    """
+    # Create and seed a new random number generator for reproducibility.
+    rng = random.Random()
+    random_seed = os.environ.get("SCANNER_FUZZ_SEED") or time.time()
+    LOG.info("Using random seed %d", random_seed)
+    rng.seed(long(random_seed))
+
+    table_format = vector.get_value('table_format')
+    self.change_database(self.client, table_format)
+
+    tmp_table_dir = tempfile.mkdtemp(prefix="tmp-scanner-fuzz-%s" % table,
+        dir=os.path.join(os.environ['IMPALA_HOME'], "testdata"))
+
+    self.execute_query("create table %s.%s like %s" % (unique_database, table, table))
+    fuzz_table_location = get_fs_path("/test-warehouse/{0}.db/{1}".format(
+        unique_database, table))
+
+    LOG.info("Generating corrupted version of %s in %s. Local working directory is %s",
+        table, unique_database, tmp_table_dir)
+
+    # Find the location of the existing table and get the full table directory structure.
+    table_loc = self._get_table_location(table, vector)
+    check_call(['hdfs', 'dfs', '-copyToLocal', table_loc + "/*", tmp_table_dir])
+
+    partitions = self.walk_and_corrupt_table_data(tmp_table_dir, num_copies, rng)
+    for partition in partitions:
+      self.execute_query('alter table {0}.{1} add partition ({2})'.format(
+          unique_database, table, ','.join(partition)))
+
+    # Copy all of the local files and directories to hdfs.
+    to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir)
+               for file_or_dir in os.listdir(tmp_table_dir)]
+    check_call(['hdfs', 'dfs', '-copyFromLocal'] + to_copy + [fuzz_table_location])
+
+    if "SCANNER_FUZZ_KEEP_FILES" not in os.environ:
+      shutil.rmtree(tmp_table_dir)
+
+    # Querying the corrupted files should not DCHECK or crash.
+    self.execute_query("refresh %s.%s" % (unique_database, table))
+    # Execute a query that tries to read all the columns and rows in the file.
+    # Also execute a count(*) that materializes no columns, since different code
+    # paths are exercised.
+    # Use abort_on_error=0 to ensure we scan all the files.
+    queries = [
+        'select count(*) from (select distinct * from {0}.{1}) q'.format(
+            unique_database, table),
+        'select count(*) from {0}.{1} q'.format(unique_database, table)]
+
+    xfail_msgs = []
+    for query in queries:
+      for batch_size in self.BATCH_SIZES:
+        query_options = {'abort_on_error': '0', 'batch_size': batch_size}
+        try:
+          result = self.execute_query(query, query_options = query_options)
+          LOG.info('\n'.join(result.log))
+        except Exception as e:
+          if 'memory limit exceeded' in str(e).lower():
+            # Memory limit error should fail query.
+            continue
+          msg = "Should not throw error when abort_on_error=0: '{0}'".format(e)
+          LOG.error(msg)
+          # Parquet fails the query for some parse errors.
+          if table_format.file_format == 'parquet':
+            xfail_msgs.append(msg)
+          else:
+            raise
+    if len(xfail_msgs) != 0:
+      pytest.xfail('\n'.join(xfail_msgs))
+
+  def walk_and_corrupt_table_data(self, tmp_table_dir, num_copies, rng):
+    """ Walks a local copy of a HDFS table directory. Returns a list of partitions, each
+    as a list of "key=val" pairs. Ensures there is 'num_copies' copies of each file,
+    and corrupts each of the copies.
+    """
+    partitions = []
+    # Iterate over the partitions and files we downloaded.
+    for subdir, dirs, files in os.walk(tmp_table_dir):
+      if '_impala_insert_staging' in subdir: continue
+      if len(dirs) != 0: continue # Skip non-leaf directories
+
+      rel_subdir = os.path.relpath(subdir, tmp_table_dir)
+      if rel_subdir != ".":
+        # Create metadata for any directory partitions.
+        partitions.append(self.partitions_from_path(rel_subdir))
+
+      # Corrupt all of the files that we find.
+      for filename in files:
+        filepath = os.path.join(subdir, filename)
+        copies = [filepath]
+        for copy_num in range(1, num_copies):
+          copypath = os.path.join(subdir, "copy{0}_{1}".format(copy_num, filename))
+          shutil.copyfile(filepath, copypath)
+          copies.append(copypath)
+        for filepath in copies:
+          self.corrupt_file(filepath, rng)
+    return partitions
+
+  def partitions_from_path(self, relpath):
+    """ Return a list of "key=val" parts from partitions inferred from the directory path.
+    """
+    reversed_partitions = []
+    while relpath != '':
+      relpath, suffix  = os.path.split(relpath)
+      reversed_partitions.append(suffix)
+    return reversed(reversed_partitions)
+
+  def corrupt_file(self, path, rng):
+    """ Corrupt the file at 'path' in the local file system in a randomised way using the
+    random number generator 'rng'. Rewrites the file in-place.
+    Logs a message to describe how the file was corrupted, so the error is reproducible.
+    """
+    with open(path, "rb") as f:
+      data = bytearray(f.read())
+
+    if rng.random() < 0.5:
+      flip_offset = rng.randint(0, len(data) - 1)
+      flip_val = rng.randint(0, 255)
+      LOG.info("corrupt_file: Flip byte in %s at %d from %d to %d", path, flip_offset,
+          data[flip_offset], flip_val)
+      data[flip_offset] = flip_val
+    else:
+      truncation = rng.randint(0, len(data))
+      LOG.info("corrupt_file: Truncate %s to %d", path, truncation)
+      data = data[:truncation]
+
+    with open(path, "wb") as f:
+      f.write(data)
+



[2/2] incubator-impala git commit: IMPALA-3969: stress test: add option to set common query options

Posted by ta...@apache.org.
IMPALA-3969: stress test: add option to set common query options

It can be useful for debugging purposes to run the stress test with
custom query options, for example with codegen disabled. This patch adds
a command line option to the stress test entry point that allows a
caller to set query options.

To reduce support as new options are chosen, we allow freeform,
space-delimited option=value arguments as the option's value, like:

 --common_query_options option1=value1 ... optionN=valueN

This means we don't do much validation that these options and values are
well-formed.  Callers must take care to type the correct options and
values.

Testing: I ran concurrent_select.py by hand against an Impala cluster
with codegen both enabled and disabled, ala

 --common_query_options DISABLE_CODEGEN=true

Both the log written by concurrent_select.py and the Impala logs on the
cluster indicated DISABLE_CODEGEN was being set as I directed. I also
did negative testing for a few bad --common_query_options values. Either
concurrent_select.py catches the error, or when the first query runs,
the error is reported.

Change-Id: Iada041aace60c218a12178d8f1b9a68ff29de72e
Reviewed-on: http://gerrit.cloudera.org:8080/3887
Reviewed-by: David Knupp <dk...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9162d5d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9162d5d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9162d5d0

Branch: refs/heads/master
Commit: 9162d5d0544bf080da0a86331e9f77fc56803678
Parents: 5afd9f7
Author: Michael Brown <mi...@cloudera.com>
Authored: Wed Aug 10 08:47:25 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu Aug 11 22:04:03 2016 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 50 +++++++++++++++++++++++++++++++++-
 1 file changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9162d5d0/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index e7ed96b..dc06503 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -66,7 +66,7 @@ from contextlib import contextmanager
 from datetime import datetime
 from multiprocessing import Lock, Process, Queue, Value
 from random import choice, random, randrange
-from sys import maxint
+from sys import exit, maxint
 from tempfile import gettempdir
 from textwrap import dedent
 from threading import current_thread, Thread
@@ -93,6 +93,7 @@ MEM_ESTIMATE_PATTERN = re.compile(r"Estimated.*Memory=(\d+.?\d*)(T|G|M|K)?B")
 # The version of the file format containing the collected query runtime info.
 RUNTIME_INFO_FILE_VERSION = 2
 
+
 def create_and_start_daemon_thread(fn, name):
   thread = Thread(target=fn, name=name)
   thread.error = None
@@ -267,6 +268,7 @@ class StressRunner(object):
 
   def __init__(self):
     self.use_kerberos = False
+    self.common_query_options = {}
     self._mem_broker = None
 
     # Synchronized blocking work queue for producer/consumers.
@@ -539,6 +541,7 @@ class StressRunner(object):
     runner.impalad = impalad
     runner.result_hash_log_dir = self.result_hash_log_dir
     runner.use_kerberos = self.use_kerberos
+    runner.common_query_options = self.common_query_options
     runner.connect()
 
     while not self._query_queue.empty():
@@ -674,6 +677,7 @@ class Query(object):
     self.required_mem_mb_without_spilling = None
     self.solo_runtime_secs_with_spilling = None
     self.solo_runtime_secs_without_spilling = None
+    self.common_query_options = {}
 
   def __repr__(self):
     return dedent("""
@@ -717,6 +721,9 @@ class QueryRunner(object):
     try:
       with self.impalad_conn.cursor() as cursor:
         start_time = time()
+        for query_option, value in self.common_query_options.iteritems():
+          cursor.execute(
+              "SET {query_option}={value}".format(query_option=query_option, value=value))
         cursor.execute("SET ABORT_ON_ERROR=1")
         LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
         cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
@@ -1355,6 +1362,12 @@ def main():
   parser.add_argument("--nlj-filter", choices=("in", "out", None),
       help="'in' means only nested-loop queries will be used, 'out' means no NLJ queries"
       " will be used. The default is to not filter either way.")
+  parser.add_argument(
+      "--common-query-options", default=None, nargs="*",
+      help="Space-delimited string of query options and values. This is a freeform "
+      "string with little regard to whether you've spelled the query options correctly "
+      "or set valid values. Example: --common-query-options "
+      "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
   args = parser.parse_args()
 
   cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file,
@@ -1366,6 +1379,40 @@ def main():
     raise Exception("At least one of --tpcds-db, --tpch-db,"
         "--tpch-nested-db, --random-db, --query-file-path is required")
 
+  # The stress test sets these, so callers cannot override them.
+  IGNORE_QUERY_OPTIONS = frozenset([
+      'ABORT_ON_ERROR',
+      'MEM_LIMIT',
+  ])
+
+  common_query_options = {}
+  if args.common_query_options is not None:
+    for query_option_and_value in args.common_query_options:
+      try:
+        query_option, value = query_option_and_value.split('=')
+      except ValueError:
+        LOG.error(
+            "Could not parse --common-query-options: '{common_query_options}'".format(
+                common_query_options=args.common_query_options))
+        exit(1)
+      query_option = query_option.upper()
+      if query_option in common_query_options:
+        LOG.error(
+            "Query option '{query_option}' already defined in --common-query-options: "
+            "'{common_query_options}'".format(
+                query_option=query_option,
+                common_query_options=args.common_query_options))
+        exit(1)
+      elif query_option in IGNORE_QUERY_OPTIONS:
+        LOG.warn(
+            "Ignoring '{query_option}' in common query options: '{opt}': "
+            "The stress test algorithm needs control of this option.".format(
+                query_option=query_option, opt=args.common_query_options))
+      else:
+        common_query_options[query_option] = value
+        LOG.debug("Common query option '{query_option}' set to '{value}'".format(
+            query_option=query_option, value=value))
+
   cluster = cli_options.create_cluster(args)
   cluster.is_kerberized = args.use_kerberos
   impala = cluster.impala
@@ -1491,6 +1538,7 @@ def main():
   stress_runner.cancel_probability = args.cancel_probability
   stress_runner.spill_probability = args.spill_probability
   stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins
+  stress_runner.common_query_options = common_query_options
   stress_runner.run_queries(queries, impala, args.max_queries, args.mem_overcommit_pct,
       not args.no_status)   # This is the value of 'should_print_status'.