You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/05/22 03:33:56 UTC

[1/4] impala git commit: IMPALA-6998: test_bloom_wait_time fails due to late arrival of filters on Isilon

Repository: impala
Updated Branches:
  refs/heads/2.x 75d19c874 -> fb876f7e3


IMPALA-6998: test_bloom_wait_time fails due to late arrival of filters on Isilon

This test has been failing on Isilon runs, most likely due to timing issues
which makes it a test issue rather than a product bug.

This patch disables the test for Isilon. We should revisit what tests we run
on non-HDFS filesystems later on, but until then, this should unblock
the build.

Change-Id: I2df6983a65a50b7efdd482124b70f518ee4c3229
Reviewed-on: http://gerrit.cloudera.org:8080/10366
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fb876f7e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fb876f7e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fb876f7e

Branch: refs/heads/2.x
Commit: fb876f7e3b3f441760dbad972d56a863401a2437
Parents: 9116423
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Thu May 10 09:16:51 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon May 21 20:10:17 2018 +0000

----------------------------------------------------------------------
 tests/query_test/test_runtime_filters.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fb876f7e/tests/query_test/test_runtime_filters.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index 14f5884..38fd7a5 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -22,14 +22,17 @@ import time
 
 from tests.common.environ import specific_build_type_timeout
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfLocal
+from tests.common.skip import SkipIfLocal, SkipIfIsilon
 
 WAIT_TIME_MS = specific_build_type_timeout(60000, slow_build_timeout=100000)
 
 # Some of the queries in runtime_filters consume a lot of memory, leading to
 # significant memory reservations in parallel tests.
+# Skipping Isilon due to IMPALA-6998. TODO: Remove when there's a holistic revamp of
+# what tests to run for non-HDFS platforms
 @pytest.mark.execute_serially
 @SkipIfLocal.multiple_impalad
+@SkipIfIsilon.jira(reason="IMPALA-6998")
 class TestRuntimeFilters(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):


[2/4] impala git commit: Moving default sanitizer options into init.cc from shell scripts.

Posted by ph...@apache.org.
Moving default sanitizer options into init.cc from shell scripts.

When running tests with ASAN, you need to set ASAN_OPTIONS explicitly,
to avoid various failures. In particular, backend tests fail complaining
about memory leaks and tests that use the parquet-reader binary complain
similarly. It turns out that we can shove the default options into our
code base directly, avoiding the need for users to set it explicitly.

I've done the same thing for TSAN and UBSAN.

I've manually checked that these are being read. In the UBSAN case, I
checked both with gdb and with inotifywatch on the suppressions file.

Change-Id: I3cbbd210c67750a48003f336bea1f3e1cb2d9e6b
Reviewed-on: http://gerrit.cloudera.org:8080/10404
Reviewed-by: Jim Apple <jb...@apache.org>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2caec900
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2caec900
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2caec900

Branch: refs/heads/2.x
Commit: 2caec900502e9991f53154268b6d2b52f5c78ea2
Parents: 69e88f7
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Mon May 14 20:36:57 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon May 21 20:10:17 2018 +0000

----------------------------------------------------------------------
 be/CMakeLists.txt        |  2 +-
 be/src/common/init.cc    | 26 ++++++++++++++++++++++++++
 bin/run-backend-tests.sh |  4 +---
 bin/start-catalogd.sh    |  4 ----
 bin/start-impalad.sh     |  4 ----
 bin/start-statestored.sh |  4 ----
 6 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2caec900/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index cc5a597..2ef7b85 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -112,7 +112,7 @@ SET(CXX_FLAGS_ADDRESS_SANITIZER
 
 # Set the flags to the undefined behavior sanitizer, also known as "ubsan"
 # Turn on sanitizer and debug symbols to get stack traces:
-SET(CXX_FLAGS_UBSAN "${CXX_CLANG_FLAGS} -ggdb3 -fno-omit-frame-pointer -fsanitize=undefined")
+SET(CXX_FLAGS_UBSAN "${CXX_CLANG_FLAGS} -ggdb3 -fno-omit-frame-pointer -fsanitize=undefined -DUNDEFINED_SANITIZER")
 # Add flags to enable symbol resolution in the stack traces:
 SET(CXX_FLAGS_UBSAN "${CXX_FLAGS_UBSAN} -rtlib=compiler-rt -lgcc_s")
 # Ignore a number of noisy errors with too many false positives:

http://git-wip-us.apache.org/repos/asf/impala/blob/2caec900/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index 745112a..9daaf5c 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -274,3 +274,29 @@ Status impala::StartMemoryMaintenanceThread() {
   return Thread::Create("common", "memory-maintenance-thread",
       &MemoryMaintenanceThread, &memory_maintenance_thread);
 }
+
+#if defined(ADDRESS_SANITIZER)
+// Default ASAN_OPTIONS. Override by setting environment variable $ASAN_OPTIONS.
+extern "C" const char *__asan_default_options() {
+  // IMPALA-2746: backend tests don't pass with leak sanitizer enabled.
+  return "handle_segv=0 detect_leaks=0 allocator_may_return_null=1";
+}
+#endif
+
+#if defined(THREAD_SANITIZER)
+// Default TSAN_OPTIONS. Override by setting environment variable $TSAN_OPTIONS.
+extern "C" const char *__tsan_default_options() {
+  // Note that backend test should re-configure to halt_on_error=1
+  return "halt_on_error=0 history_size=7";
+}
+#endif
+
+// Default UBSAN_OPTIONS. Override by setting environment variable $UBSAN_OPTIONS.
+#if defined(UNDEFINED_SANITIZER)
+extern "C" const char *__ubsan_default_options() {
+  static const string default_options = Substitute(
+      "print_stacktrace=1 suppressions=$0/bin/ubsan-suppressions.txt",
+      getenv("IMPALA_HOME") == nullptr ? "." : getenv("IMPALA_HOME"));
+  return default_options.c_str();
+}
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/2caec900/bin/run-backend-tests.sh
----------------------------------------------------------------------
diff --git a/bin/run-backend-tests.sh b/bin/run-backend-tests.sh
index d4d3142..3bc84c2 100755
--- a/bin/run-backend-tests.sh
+++ b/bin/run-backend-tests.sh
@@ -37,9 +37,7 @@ cd ${IMPALA_BE_DIR}
 cd ..
 
 export CTEST_OUTPUT_ON_FAILURE=1
-export ASAN_OPTIONS="handle_segv=0 detect_leaks=0 allocator_may_return_null=1"
-export UBSAN_OPTIONS="print_stacktrace=1"
-UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
+# Override default TSAN_OPTIONS so that halt_on_error is set.
 export TSAN_OPTIONS="halt_on_error=1 history_size=7"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
 "${MAKE_CMD:-make}" test ARGS="${BE_TEST_ARGS}"

http://git-wip-us.apache.org/repos/asf/impala/blob/2caec900/bin/start-catalogd.sh
----------------------------------------------------------------------
diff --git a/bin/start-catalogd.sh b/bin/start-catalogd.sh
index 4ec6846..a8b7e28 100755
--- a/bin/start-catalogd.sh
+++ b/bin/start-catalogd.sh
@@ -70,9 +70,5 @@ if ${CLUSTER_DIR}/admin is_kerberized; then
 fi
 
 . ${IMPALA_HOME}/bin/set-classpath.sh
-export ASAN_OPTIONS="handle_segv=0 detect_leaks=0 allocator_may_return_null=1"
-export UBSAN_OPTIONS="print_stacktrace=1"
-UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
-export TSAN_OPTIONS="halt_on_error=0 history_size=7"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
 exec ${BINARY_BASE_DIR}/${BUILD_TYPE}/catalog/catalogd ${CATALOGD_ARGS}

http://git-wip-us.apache.org/repos/asf/impala/blob/2caec900/bin/start-impalad.sh
----------------------------------------------------------------------
diff --git a/bin/start-impalad.sh b/bin/start-impalad.sh
index 76a5f2c..f052fa2 100755
--- a/bin/start-impalad.sh
+++ b/bin/start-impalad.sh
@@ -98,9 +98,5 @@ if ${CLUSTER_DIR}/admin is_kerberized; then
 fi
 
 . ${IMPALA_HOME}/bin/set-classpath.sh
-export ASAN_OPTIONS="handle_segv=0 detect_leaks=0 allocator_may_return_null=1"
-export UBSAN_OPTIONS="print_stacktrace=1"
-UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
-export TSAN_OPTIONS="halt_on_error=0 history_size=7"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
 exec ${TOOL_PREFIX} ${IMPALA_CMD} ${IMPALAD_ARGS}

http://git-wip-us.apache.org/repos/asf/impala/blob/2caec900/bin/start-statestored.sh
----------------------------------------------------------------------
diff --git a/bin/start-statestored.sh b/bin/start-statestored.sh
index 02cf09f..f023810 100755
--- a/bin/start-statestored.sh
+++ b/bin/start-statestored.sh
@@ -59,9 +59,5 @@ if ${CLUSTER_DIR}/admin is_kerberized; then
   fi
 fi
 
-export ASAN_OPTIONS="handle_segv=0 detect_leaks=0 allocator_may_return_null=1"
-export UBSAN_OPTIONS="print_stacktrace=1"
-UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
-export TSAN_OPTIONS="halt_on_error=0 history_size=7"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
 exec ${BINARY_BASE_DIR}/${BUILD_TYPE}/statestore/statestored ${STATESTORED_ARGS}


[3/4] impala git commit: IMPALA-6070: Adding ASAN, --tail to test-with-docker.

Posted by ph...@apache.org.
IMPALA-6070: Adding ASAN, --tail to test-with-docker.

* Adds -ASAN suites to test-with-docker.
* Adds --tail flag, which starts a tail subprocess. This
  isn't pretty (there's potential for overlap), but it's a dead simple
  way to keep an eye on what's going on.
* Fixes a bug wherein I could call "docker rm <container>" twice
  simultaneously, which would make Docker fail the second call,
  and then fail the related "docker rmi". It's better to serialize,
  and I did that with a simple lock.

Change-Id: I51451cdf1352fc0f9516d729b9a77700488d993f
Reviewed-on: http://gerrit.cloudera.org:8080/10319
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9116423a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9116423a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9116423a

Branch: refs/heads/2.x
Commit: 9116423a76ca1a11fdd440d20f6fd14700bf9df9
Parents: 2caec90
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Thu Apr 26 21:10:37 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon May 21 20:10:17 2018 +0000

----------------------------------------------------------------------
 docker/entrypoint.sh       | 11 +++++++++--
 docker/test-with-docker.py | 44 ++++++++++++++++++++++++++++++++++++-----
 2 files changed, 48 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9116423a/docker/entrypoint.sh
----------------------------------------------------------------------
diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index 1243e28..37809cd 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -183,7 +183,9 @@ function build_impdev() {
   # Note that IMPALA-6494 prevents us from using shared library builds,
   # which are smaller and thereby speed things up. We use "-notests"
   # to avoid building backend tests, which are sizable, and
-  # can be built when executing those tests.
+  # can be built when executing those tests. We use "-noclean" to
+  # avoid deleting the log for this invocation which is in logs/,
+  # and, this is a first build anyway.
   ./buildall.sh -noclean -format -testdata -notests
 
   # Dump current memory usage to logs, before shutting things down.
@@ -257,8 +259,13 @@ function test_suite() {
   boot_container
   impala_environment
 
+  if [[ ${REBUILD_ASAN:-false} = true ]]; then
+    # Note: we're not redoing data loading.
+    SKIP_TOOLCHAIN_BOOTSTRAP=true ./buildall.sh -noclean -notests -asan
+  fi
+
   # BE tests don't require the minicluster, so we can run them directly.
-  if [[ $1 = BE_TEST ]]; then
+  if [[ $1 = BE_TEST* ]]; then
     make -j$(nproc) --load-average=$(nproc) be-test be-benchmarks
     if ! bin/run-backend-tests.sh; then
       echo "Tests $1 failed!"

http://git-wip-us.apache.org/repos/asf/impala/blob/9116423a/docker/test-with-docker.py
----------------------------------------------------------------------
diff --git a/docker/test-with-docker.py b/docker/test-with-docker.py
index fa230e8..1922c57 100755
--- a/docker/test-with-docker.py
+++ b/docker/test-with-docker.py
@@ -113,6 +113,7 @@ import re
 import subprocess
 import sys
 import tempfile
+import threading
 import time
 
 if __name__ == '__main__' and __package__ is None:
@@ -178,6 +179,8 @@ def main():
   parser.add_argument('--ccache-dir', metavar='DIR',
                       help="CCache directory to use",
                       default=os.path.expanduser("~/.ccache"))
+  parser.add_argument('--tail', action="store_true",
+      help="Run tail on all container log files.")
   parser.add_argument('--test', action="store_true")
   args = parser.parse_args()
 
@@ -193,7 +196,8 @@ def main():
       cleanup_image=args.cleanup_image, ccache_dir=args.ccache_dir, test_mode=args.test,
       parallel_test_concurrency=args.parallel_test_concurrency,
       suite_concurrency=args.suite_concurrency,
-      impalad_mem_limit_bytes=args.impalad_mem_limit_bytes)
+      impalad_mem_limit_bytes=args.impalad_mem_limit_bytes,
+      tail=args.tail)
 
   fh = logging.FileHandler(os.path.join(_make_dir_if_not_exist(t.log_dir), "log.txt"))
   fh.setFormatter(logging.Formatter(LOG_FORMAT))
@@ -296,6 +300,12 @@ class Suite(object):
     r.timeout_minutes = 240
     return r
 
+  def asan(self):
+    """Returns an ASAN copy of this suite."""
+    r = self.copy(self.name + "_ASAN", REBUILD_ASAN="true")
+    r.timeout_minutes = self.timeout_minutes * 2.0 + 10
+    return r
+
   def sharded(self, shards):
     """Returns a list of sharded copies of the list.
 
@@ -311,6 +321,7 @@ class Suite(object):
     return ret
 
 # Definitions of all known suites:
+be_test = Suite("BE_TEST")
 ee_test_serial = Suite("EE_TEST_SERIAL", EE_TEST="true",
     RUN_TESTS_ARGS="--skip-parallel --skip-stress")
 ee_test_serial.shard_at_concurrency = 4
@@ -339,6 +350,13 @@ OTHER_SUITES = [
     ee_test_parallel_exhaustive,
     ee_test_serial_exhaustive,
     cluster_test_exhaustive,
+
+    # ASAN
+    be_test.asan(),
+    cluster_test.asan(),
+    ee_test_parallel.asan(),
+    ee_test_serial.asan(),
+
     Suite("RAT_CHECK"),
     # These are used for testing this script
     Suite("NOOP"),
@@ -388,6 +406,9 @@ class Container(object):
     self.end = None
     self.removed = False
 
+    # Protects multiple calls to "docker rm <self.id>"
+    self.lock = threading.Lock()
+
     # Updated by Timeline class
     self.total_user_cpu = -1
     self.total_system_cpu = -1
@@ -409,7 +430,7 @@ class TestWithDocker(object):
   def __init__(self, build_image, suite_names, name, cleanup_containers,
                cleanup_image, ccache_dir, test_mode,
                suite_concurrency, parallel_test_concurrency,
-               impalad_mem_limit_bytes):
+               impalad_mem_limit_bytes, tail):
     self.build_image = build_image
     self.name = name
     self.containers = []
@@ -442,6 +463,7 @@ class TestWithDocker(object):
     self.suite_concurrency = suite_concurrency
     self.parallel_test_concurrency = parallel_test_concurrency
     self.impalad_mem_limit_bytes = impalad_mem_limit_bytes
+    self.tail = tail
 
     # Map suites back into objects; we ignore case for this mapping.
     suites = []
@@ -518,8 +540,13 @@ class TestWithDocker(object):
     run through annotate.py to add timestamps and saved into the container's log file.
     """
     container.running = True
+    tailer = None
+
+    with open(container.logfile, "aw") as log_output:
+      if self.tail:
+        tailer = subprocess.Popen(
+            ["tail", "-f", "--pid", str(os.getpid()), "-v", container.logfile])
 
-    with file(container.logfile, "w") as log_output:
       container.start = time.time()
       # Sets up a "docker start ... | annotate.py > logfile" pipeline using
       # subprocess.
@@ -542,6 +569,8 @@ class TestWithDocker(object):
       container.exitcode = ret
       container.running = False
       container.end = time.time()
+      if tailer:
+        tailer.kill()
       return ret == 0
 
   @staticmethod
@@ -555,8 +584,13 @@ class TestWithDocker(object):
   @staticmethod
   def _rm_container(container):
     """Removes container."""
-    if not container.removed:
-      _call(["docker", "rm", container.id], check=False)
+    # We can have multiple threads trying to call "docker rm" on a container.
+    # Docker will fail one of those with "already running", but we actually
+    # want to block until it's removed. Using a lock to serialize the "docker # rm"
+    # calls handles that.
+    with container.lock:
+      if not container.removed:
+        _call(["docker", "rm", container.id], check=False)
       container.removed = True
 
   def _create_build_image(self):


[4/4] impala git commit: IMPALA-3833: Fix invalid data handling in Sequence and RCFile scanners

Posted by ph...@apache.org.
IMPALA-3833: Fix invalid data handling in Sequence and RCFile scanners

Introduced new error message when scanning a corrupt Sequence or RCFile.
Added new checks to detect buffer overrun while handling Sequence or RCFile.

Testing:
  a) Made changes to fuzz test for RCFile/Sequence file, ran fuzz test in a loop
      with 200 iteration without failure.

  b) Ran exhaustive test on the changes without failure.

Change-Id: Ic9cfc38af3f30c65ada9734eb471dbfa6ecdd74a
Reviewed-on: http://gerrit.cloudera.org:8080/8936
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/69e88f70
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/69e88f70
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/69e88f70

Branch: refs/heads/2.x
Commit: 69e88f70f9fffad1086e3e66ebb38be15a2b1c67
Parents: 75d19c8
Author: Pranay <ps...@cloudera.com>
Authored: Wed Apr 18 18:01:56 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon May 21 20:10:17 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-rcfile-scanner.cc     | 153 ++++++++++++++++++++++------
 be/src/exec/hdfs-rcfile-scanner.h      |  16 ++-
 be/src/exec/hdfs-sequence-scanner.cc   |  79 ++++++++++----
 be/src/exec/hdfs-sequence-scanner.h    |   6 ++
 be/src/exec/read-write-util-test.cc    |  11 +-
 be/src/exec/read-write-util.h          |  32 ++++--
 be/src/exec/scanner-context.inline.h   |   5 +
 be/src/util/decompress.cc              |  10 ++
 tests/query_test/test_scanners_fuzz.py |  11 +-
 9 files changed, 255 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/69e88f70/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index a706c3d..37376c6 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -49,6 +49,9 @@ const char* const HdfsRCFileScanner::RCFILE_METADATA_KEY_NUM_COLS =
 
 const uint8_t HdfsRCFileScanner::RCFILE_VERSION_HEADER[4] = {'R', 'C', 'F', 1};
 
+// Check max column limit, set to 8 million
+const int HdfsRCFileScanner::MAX_NCOLS = 8000000;
+
 // Macro to convert between SerdeUtil errors to Status returns.
 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
 
@@ -82,6 +85,13 @@ Status HdfsRCFileScanner::InitNewRange() {
         reuse_row_group_buffer_, header_->codec, &decompressor_));
   }
 
+  int ncols = reinterpret_cast<RcFileHeader*>(header_)->num_cols;
+  if (ncols < 0 || ncols > MAX_NCOLS) {
+    stringstream ss;
+    ss << stream_->filename() << " Column limit has exceeded " << MAX_NCOLS
+       << " limit, the number of columns are " << ncols;
+    return Status(ss.str());
+  }
   // Allocate the buffers for the key information that is used to read and decode
   // the column data.
   columns_.resize(reinterpret_cast<RcFileHeader*>(header_)->num_cols);
@@ -116,7 +126,7 @@ Status HdfsRCFileScanner::ReadFileHeader() {
     rc_header->version = RCF1;
   } else {
     stringstream ss;
-    ss << "Invalid RCFILE_VERSION_HEADER: '"
+    ss << stream_->filename() << " Invalid RCFILE_VERSION_HEADER: '"
        << ReadWriteUtil::HexDump(header, sizeof(RCFILE_VERSION_HEADER)) << "'";
     return Status(ss.str());
   }
@@ -130,9 +140,8 @@ Status HdfsRCFileScanner::ReadFileHeader() {
     if (len != strlen(HdfsRCFileScanner::RCFILE_KEY_CLASS_NAME) ||
         memcmp(class_name_key, HdfsRCFileScanner::RCFILE_KEY_CLASS_NAME, len)) {
       stringstream ss;
-      ss << "Invalid RCFILE_KEY_CLASS_NAME: '"
-         << string(reinterpret_cast<char*>(class_name_key), len)
-         << "' len=" << len;
+      ss << stream_->filename() << " Invalid RCFILE_KEY_CLASS_NAME: '"
+         << string(reinterpret_cast<char*>(class_name_key), len) << "' len=" << len;
       return Status(ss.str());
     }
 
@@ -142,9 +151,8 @@ Status HdfsRCFileScanner::ReadFileHeader() {
     if (len != strlen(HdfsRCFileScanner::RCFILE_VALUE_CLASS_NAME) ||
         memcmp(class_name_val, HdfsRCFileScanner::RCFILE_VALUE_CLASS_NAME, len)) {
       stringstream ss;
-      ss << "Invalid RCFILE_VALUE_CLASS_NAME: '"
-         << string(reinterpret_cast<char*>(class_name_val), len)
-         << "' len=" << len;
+      ss << stream_->filename() << " Invalid RCFILE_VALUE_CLASS_NAME: '"
+         << string(reinterpret_cast<char*>(class_name_val), len) << "' len=" << len;
       return Status(ss.str());
     }
   }
@@ -161,7 +169,7 @@ Status HdfsRCFileScanner::ReadFileHeader() {
         stream_->ReadBoolean(&is_blk_compressed, &parse_status_));
     if (is_blk_compressed) {
       stringstream ss;
-      ss << "RC files do no support block compression.";
+      ss << stream_->filename() << " RC files does not support block compression.";
       return Status(ss.str());
     }
   }
@@ -172,7 +180,11 @@ Status HdfsRCFileScanner::ReadFileHeader() {
     RETURN_IF_FALSE(stream_->ReadText(&codec_ptr, &len, &parse_status_));
     header_->codec = string(reinterpret_cast<char*>(codec_ptr), len);
     Codec::CodecMap::const_iterator it = Codec::CODEC_MAP.find(header_->codec);
-    DCHECK(it != Codec::CODEC_MAP.end());
+    if (it == Codec::CODEC_MAP.end()) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid codec: " << header_->codec;
+      return Status(ss.str());
+    }
     header_->compression_type = it->second;
   } else {
     header_->compression_type = THdfsCompression::NONE;
@@ -208,10 +220,10 @@ Status HdfsRCFileScanner::ReadNumColumnsMetadata() {
       StringParser::ParseResult result;
       int num_cols =
           StringParser::StringToInt<int>(value_str.c_str(), value_str.size(), &result);
-      if (result != StringParser::PARSE_SUCCESS) {
+      if (result != StringParser::PARSE_SUCCESS || num_cols < 0) {
         stringstream ss;
-        ss << "Could not parse number of columns in file " << stream_->filename()
-           << ": " << value_str;
+        ss << " Could not parse number of columns in file " << stream_->filename()
+           << " : " << value_str;
         if (result == StringParser::PARSE_OVERFLOW) ss << " (result overflowed)";
         return Status(ss.str());
       }
@@ -271,7 +283,8 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad record length: " << record_length << " at offset: " << position;
+    ss << stream_->filename() << " Bad record length: " << record_length
+       << " at offset: " << position;
     return Status(ss.str());
   }
   RETURN_IF_FALSE(stream_->ReadInt(&key_length_, &parse_status_));
@@ -279,7 +292,8 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad key length: " << key_length_ << " at offset: " << position;
+    ss << stream_->filename() << " Bad key length: " << key_length_
+       << " at offset: " << position;
     return Status(ss.str());
   }
   RETURN_IF_FALSE(stream_->ReadInt(&compressed_key_length_, &parse_status_));
@@ -287,7 +301,7 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad compressed key length: " << compressed_key_length_
+    ss << stream_->filename() << " Bad compressed key length: " << compressed_key_length_
        << " at offset: " << position;
     return Status(ss.str());
   }
@@ -316,42 +330,98 @@ Status HdfsRCFileScanner::ReadKeyBuffers() {
     memcpy(key_buffer, buffer, key_length_);
   }
 
-  row_group_length_ = 0;
   uint8_t* key_buf_ptr = key_buffer;
-  int bytes_read = ReadWriteUtil::GetVInt(key_buf_ptr, &num_rows_);
+  row_group_length_ = 0;
+  int remain_len = key_length_;
+  int bytes_read = ReadWriteUtil::GetVInt(key_buf_ptr, &num_rows_, key_length_);
+  if (bytes_read == -1 || num_rows_ < 0) {
+    stringstream ss;
+    ss << stream_->filename() << " Bad row group key buffer, key length: " << key_length_;
+    return Status(ss.str());
+  }
   key_buf_ptr += bytes_read;
+  remain_len = remain_len - bytes_read;
 
+  // Track the starting position in the buffer.
+  uint8_t* start_key_buf_ptr = key_buf_ptr;
   for (int col_idx = 0; col_idx < columns_.size(); ++col_idx) {
-    GetCurrentKeyBuffer(col_idx, !columns_[col_idx].materialize_column, &key_buf_ptr);
-    DCHECK_LE(key_buf_ptr, key_buffer + key_length_);
+    if (key_buf_ptr < start_key_buf_ptr || (key_buf_ptr > key_buffer + key_length_)
+        || remain_len <= 0) {
+      stringstream ss;
+      ss << stream_->filename() << " Bad row group key buffer, column idx: " << col_idx;
+      return Status(ss.str());
+    }
+    RETURN_IF_ERROR(GetCurrentKeyBuffer(
+        col_idx, !columns_[col_idx].materialize_column, &key_buf_ptr, remain_len));
+    remain_len = remain_len - (key_buf_ptr - start_key_buf_ptr);
+    start_key_buf_ptr = key_buf_ptr;
   }
-  DCHECK_EQ(key_buf_ptr, key_buffer + key_length_);
 
   return Status::OK();
 }
 
-void HdfsRCFileScanner::GetCurrentKeyBuffer(int col_idx, bool skip_col_data,
-                                            uint8_t** key_buf_ptr) {
+Status HdfsRCFileScanner::BadColumnInfo(int col_idx) {
+  stringstream ss;
+  ss << stream_->filename() << " Corrupt column at index: " << col_idx;
+  return Status(ss.str());
+}
+
+Status HdfsRCFileScanner::GetCurrentKeyBuffer(
+    int col_idx, bool skip_col_data, uint8_t** key_buf_ptr, int buf_length) {
   ColumnInfo& col_info = columns_[col_idx];
+  int remain_len = buf_length;
 
-  int bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.buffer_len);
+  if (remain_len <= 0) {
+    return BadColumnInfo(col_idx);
+  }
+
+  DCHECK_GT(remain_len, 0);
+  int bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.buffer_len, remain_len);
+  if (bytes_read == -1) {
+    return BadColumnInfo(col_idx);
+  }
   *key_buf_ptr += bytes_read;
+  remain_len -= bytes_read;
+  DCHECK_GT(remain_len, 0);
 
-  bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.uncompressed_buffer_len);
+  bytes_read =
+      ReadWriteUtil::GetVInt(*key_buf_ptr, &col_info.uncompressed_buffer_len, remain_len);
+  if (bytes_read == -1) {
+    return BadColumnInfo(col_idx);
+  }
   *key_buf_ptr += bytes_read;
+  remain_len -= bytes_read;
+  if (remain_len <= 0) {
+    return BadColumnInfo(col_idx);
+  }
 
   int col_key_buf_len;
-  bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr , &col_key_buf_len);
+  bytes_read = ReadWriteUtil::GetVInt(*key_buf_ptr, &col_key_buf_len, remain_len);
+  if (bytes_read == -1) {
+    return BadColumnInfo(col_idx);
+  }
+
   *key_buf_ptr += bytes_read;
+  remain_len -= bytes_read;
+  if (col_info.uncompressed_buffer_len < 0 || remain_len <= 0) {
+    return BadColumnInfo(col_idx);
+  }
 
   if (!skip_col_data) {
     col_info.key_buffer = *key_buf_ptr;
 
+    DCHECK_GE(col_info.uncompressed_buffer_len, 0);
+
     // Set the offset for the start of the data for this column in the allocated buffer.
     col_info.start_offset = row_group_length_;
     row_group_length_ += col_info.uncompressed_buffer_len;
   }
+  col_info.buf_length = col_key_buf_len;
   *key_buf_ptr += col_key_buf_len;
+  remain_len -= bytes_read;
+  DCHECK_GE(remain_len, 0);
+
+  return Status::OK();
 }
 
 inline Status HdfsRCFileScanner::NextField(int col_idx) {
@@ -366,11 +436,11 @@ inline Status HdfsRCFileScanner::NextField(int col_idx) {
     int64_t length = 0;
     uint8_t* col_key_buf = col_info.key_buffer;
     int bytes_read = ReadWriteUtil::GetVLong(
-        col_key_buf, col_info.key_buffer_pos, &length);
+        col_key_buf, col_info.key_buffer_pos, &length, col_info.buf_length);
     if (bytes_read == -1) {
         int64_t position = stream_->file_offset();
         stringstream ss;
-        ss << "Invalid column length at offset: " << position;
+        ss << stream_->filename() << " Invalid column length at offset: " << position;
         return Status(ss.str());
     }
     col_info.key_buffer_pos += bytes_read;
@@ -403,6 +473,11 @@ Status HdfsRCFileScanner::ReadColumnBuffers() {
   for (int col_idx = 0; col_idx < columns_.size(); ++col_idx) {
     ColumnInfo& column = columns_[col_idx];
     if (!columns_[col_idx].materialize_column) {
+      if (column.buffer_len < 0) {
+        stringstream ss;
+        ss << stream_->filename() << " Bad column buffer len: " << column.buffer_len;
+        return Status(ss.str());
+      }
       // Not materializing this column, just skip it.
       RETURN_IF_FALSE(
           stream_->SkipBytes(column.buffer_len, &parse_status_));
@@ -411,7 +486,17 @@ Status HdfsRCFileScanner::ReadColumnBuffers() {
 
     // TODO: Stream through these column buffers instead of reading everything
     // in at once.
-    DCHECK_LE(column.uncompressed_buffer_len + column.start_offset, row_group_length_);
+    // Uncompressed buffer size for a column should not exceed the row_group_length_
+    // as row_group_length_ is a sum of uncompressed buffer length for all the columns
+    // so this check ensures that there is enough space in row_group_buffer for the
+    // uncompressed data.
+    if (column.uncompressed_buffer_len + column.start_offset > row_group_length_) {
+      stringstream ss;
+      ss << stream_->filename() << " Bad column buffer uncompressed buffer length: "
+         << column.uncompressed_buffer_len << " at offset " << column.start_offset;
+      return Status(ss.str());
+    }
+
     if (header_->is_compressed) {
       uint8_t* compressed_input;
       RETURN_IF_FALSE(stream_->ReadBytes(
@@ -493,8 +578,16 @@ Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
         const char* col_start = reinterpret_cast<const char*>(
             row_group_buffer_ + column.start_offset + column.buffer_pos);
         const int field_len = column.current_field_len;
-        DCHECK_LE(col_start + field_len,
-            reinterpret_cast<const char*>(row_group_buffer_ + row_group_length_));
+        const char* row_group_end =
+            reinterpret_cast<const char*>(row_group_buffer_ + row_group_length_);
+        const char* col_end = col_start + field_len;
+        if (col_end > row_group_end || column.start_offset < 0 || column.buffer_pos < 0
+            || col_start > row_group_end || field_len < 0) {
+          stringstream ss;
+          ss << stream_->filename()
+             << " Bad column index at offset : " << column.start_offset;
+          return Status(ss.str());
+        }
 
         if (!text_converter_->WriteSlot(slot_desc, tuple, col_start, field_len,
             false, false, row_batch->tuple_data_pool())) {

http://git-wip-us.apache.org/repos/asf/impala/blob/69e88f70/be/src/exec/hdfs-rcfile-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h
index 835d2a2..f35bcf8 100644
--- a/be/src/exec/hdfs-rcfile-scanner.h
+++ b/be/src/exec/hdfs-rcfile-scanner.h
@@ -257,6 +257,9 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   /// of the file {'R', 'C', 'F' 1}
   static const uint8_t RCFILE_VERSION_HEADER[4];
 
+  // Check max column limit
+  static const int MAX_NCOLS;
+
   /// Implementation of superclass functions.
   virtual FileHeader* AllocateFileHeader();
   virtual Status ReadFileHeader() WARN_UNUSED_RESULT;
@@ -289,12 +292,14 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   /// Input/Output:
   ///   key_buf_ptr: Pointer to the buffered file data, this will be moved
   ///                past the data for this column.
+  ///   buf_len: Length of the buffer that will be read.
   /// Sets:
   ///   col_buf_len_
   ///   col_buf_uncompressed_len_
   ///   col_key_bufs_
   ///   col_bufs_off_
-  void GetCurrentKeyBuffer(int col_idx, bool skip_col_data, uint8_t** key_buf_ptr);
+  Status GetCurrentKeyBuffer(
+      int col_idx, bool skip_col_data, uint8_t** key_buf_ptr, int buf_length);
 
   /// Read the rowgroup column buffers
   /// Sets:
@@ -323,6 +328,13 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
   ///   row_pos_
   Status NextRow() WARN_UNUSED_RESULT;
 
+  /// Error message printed on a formatted stream when a bad column idex is encountered.
+  /// Input:
+  ///   col_idx: column to print.
+  /// Output:
+  ///   Error status, with formatted stream.
+  Status BadColumnInfo(int col_idx);
+
   enum Version {
     SEQ6,     // Version for sequence file and pre hive-0.9 rc files
     RCF1      // The version post hive-0.9 which uses a new header
@@ -352,6 +364,8 @@ class HdfsRCFileScanner : public BaseSequenceScanner {
     int32_t key_buffer_len;
     /// This is a ptr into the scanner's key_buffer_ for this column.
     uint8_t* key_buffer;
+    /// Length of the key buffer
+    int32_t buf_length;
 
     /// Current position in the key buffer
     int32_t key_buffer_pos;

http://git-wip-us.apache.org/repos/asf/impala/blob/69e88f70/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 8a9151e..2183655 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -120,7 +120,7 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr,
     int64_t in_size = current_block_length_ - current_key_length_;
     if (in_size < 0) {
       stringstream ss;
-      ss << "Invalid record size: " << in_size;
+      ss << stream_->filename() << " Invalid record size: " << in_size;
       return Status(ss.str());
     }
     uint8_t* compressed_data;
@@ -136,15 +136,19 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr,
     }
     *record_ptr = unparsed_data_buffer_;
     // Read the length of the record.
-    int size = ReadWriteUtil::GetVLong(*record_ptr, record_len);
-    if (size == -1) return Status("Invalid record sizse");
+    int size = ReadWriteUtil::GetVLong(*record_ptr, record_len, in_size);
+    if (size == -1) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid record size: " << in_size;
+      return Status(ss.str());
+    }
     *record_ptr += size;
   } else {
     // Uncompressed records
     RETURN_IF_FALSE(stream_->ReadVLong(record_len, &parse_status_));
     if (*record_len < 0) {
       stringstream ss;
-      ss << "Invalid record length: " << *record_len;
+      ss << stream_->filename() << " Invalid record length: " << *record_len;
       return Status(ss.str());
     }
     RETURN_IF_FALSE(
@@ -199,9 +203,9 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange(RowBatch* row_batch)
     if (sync_indicator != -1) {
       if (state_->LogHasSpace()) {
         stringstream ss;
-        ss << "Expecting sync indicator (-1) at file offset "
-            << (stream_->file_offset() - sizeof(int)) << ".  "
-            << "Sync indicator found " << sync_indicator << ".";
+        ss << stream_->filename() << " Expecting sync indicator (-1) at file offset "
+           << (stream_->file_offset() - sizeof(int)) << ".  "
+           << "Sync indicator found " << sync_indicator << ".";
         state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
       }
       return Status("Bad sync hash");
@@ -229,15 +233,34 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
   // Parse record starts and lengths
   int field_location_offset = 0;
   for (int i = 0; i < num_to_process; ++i) {
-    DCHECK_LT(i, record_locations_.size());
-    int bytes_read = ReadWriteUtil::GetVLong(
-        next_record_in_compressed_block_, &record_locations_[i].len);
+    if (i >= record_locations_.size() || record_locations_[i].len < 0
+        || next_record_in_compressed_block_ > data_buffer_end_) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
+    int bytes_read = ReadWriteUtil::GetVLong(next_record_in_compressed_block_,
+        &record_locations_[i].len, next_record_in_compressed_block_len_);
     if (UNLIKELY(bytes_read == -1)) {
-      return Status("Invalid record sizes in compressed block.");
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
     }
     next_record_in_compressed_block_ += bytes_read;
+    next_record_in_compressed_block_len_ -= bytes_read;
+    if (next_record_in_compressed_block_len_ <= 0) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
     record_locations_[i].record = next_record_in_compressed_block_;
     next_record_in_compressed_block_ += record_locations_[i].len;
+    next_record_in_compressed_block_len_ -= record_locations_[i].len;
+    if (next_record_in_compressed_block_len_ < 0) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
   }
 
   // Parse records to find field locations.
@@ -254,9 +277,17 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
           reinterpret_cast<char*>(record_locations_[i].record),
           &field_locations_[field_location_offset], &num_fields));
     }
-    DCHECK_EQ(num_fields, scan_node_->materialized_slots().size());
+    if (num_fields != scan_node_->materialized_slots().size()) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
     field_location_offset += num_fields;
-    DCHECK_LE(field_location_offset, field_locations_.size());
+    if (field_location_offset > field_locations_.size()) {
+      stringstream ss;
+      ss << stream_->filename() << " Invalid compressed block";
+      return Status(ss.str());
+    }
   }
 
   int max_added_tuples = (scan_node_->limit() == -1) ?
@@ -377,7 +408,7 @@ Status HdfsSequenceScanner::ReadFileHeader() {
 
   if (memcmp(header, SEQFILE_VERSION_HEADER, sizeof(SEQFILE_VERSION_HEADER))) {
     stringstream ss;
-    ss << "Invalid SEQFILE_VERSION_HEADER: '"
+    ss << stream_->filename() << " Invalid SEQFILE_VERSION_HEADER: '"
        << ReadWriteUtil::HexDump(header, sizeof(SEQFILE_VERSION_HEADER)) << "'";
     return Status(ss.str());
   }
@@ -390,7 +421,7 @@ Status HdfsSequenceScanner::ReadFileHeader() {
   RETURN_IF_FALSE(stream_->ReadText(&class_name, &len, &parse_status_));
   if (memcmp(class_name, HdfsSequenceScanner::SEQFILE_VALUE_CLASS_NAME, len)) {
     stringstream ss;
-    ss << "Invalid SEQFILE_VALUE_CLASS_NAME: '"
+    ss << stream_->filename() << " Invalid SEQFILE_VALUE_CLASS_NAME: '"
        << string(reinterpret_cast<char*>(class_name), len) << "'";
     return Status(ss.str());
   }
@@ -408,7 +439,9 @@ Status HdfsSequenceScanner::ReadFileHeader() {
     RETURN_IF_FALSE(stream_->ReadText(&codec_ptr, &len, &parse_status_));
     header_->codec = string(reinterpret_cast<char*>(codec_ptr), len);
     Codec::CodecMap::const_iterator it = Codec::CODEC_MAP.find(header_->codec);
-    DCHECK(it != Codec::CODEC_MAP.end());
+    if (it == Codec::CODEC_MAP.end()) {
+      return Status(TErrorCode::COMPRESSED_FILE_BLOCK_CORRUPTED, header_->codec);
+    }
     header_->compression_type = it->second;
   } else {
     header_->compression_type = THdfsCompression::NONE;
@@ -449,7 +482,8 @@ Status HdfsSequenceScanner::ReadBlockHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad block length: " << current_block_length_ << " at offset " << position;
+    ss << stream_->filename() << " Bad block length: " << current_block_length_
+       << " at offset " << position;
     return Status(ss.str());
   }
 
@@ -458,7 +492,8 @@ Status HdfsSequenceScanner::ReadBlockHeader() {
     stringstream ss;
     int64_t position = stream_->file_offset();
     position -= sizeof(int32_t);
-    ss << "Bad key length: " << current_key_length_ << " at offset " << position;
+    ss << stream_->filename() << " Bad key length: " << current_key_length_
+       << " at offset " << position;
     return Status(ss.str());
   }
 
@@ -472,8 +507,8 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
   if (num_buffered_records < 0) {
     if (state_->LogHasSpace()) {
       stringstream ss;
-      ss << "Bad compressed block record count: "
-         << num_buffered_records;
+      ss << stream_->filename()
+         << " Bad compressed block record count: " << num_buffered_records;
       state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
     }
     return Status("bad record count");
@@ -493,7 +528,7 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
   // Check for a reasonable size
   if (block_size > MAX_BLOCK_SIZE || block_size < 0) {
     stringstream ss;
-    ss << "Compressed block size is: " << block_size;
+    ss << stream_->filename() << " Compressed block size is: " << block_size;
     return Status(ss.str());
   }
 
@@ -507,6 +542,8 @@ Status HdfsSequenceScanner::ReadCompressedBlock() {
                                                 &len, &unparsed_data_buffer_));
     VLOG_FILE << "Decompressed " << block_size << " to " << len;
     next_record_in_compressed_block_ = unparsed_data_buffer_;
+    next_record_in_compressed_block_len_ = len;
+    data_buffer_end_ = unparsed_data_buffer_ + len;
   }
   num_buffered_records_in_compressed_block_ = num_buffered_records;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/69e88f70/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h
index 463ffc7..e84a5e7 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -251,10 +251,16 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   /// Buffer for data read from the 'stream_' directly or after decompression.
   uint8_t* unparsed_data_buffer_ = nullptr;
 
+  /// End of data buffer used to check out of bound error.
+  uint8_t* data_buffer_end_ = nullptr;
+
   /// Number of buffered records unparsed_data_buffer_ from block compressed data.
   int64_t num_buffered_records_in_compressed_block_ = 0;
 
   /// Next record from block compressed data.
+  int64_t next_record_in_compressed_block_len_ = 0;
+
+  /// Next record from block compressed data.
   uint8_t* next_record_in_compressed_block_ = nullptr;
 };
 

http://git-wip-us.apache.org/repos/asf/impala/blob/69e88f70/be/src/exec/read-write-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util-test.cc b/be/src/exec/read-write-util-test.cc
index e4448de..453a333 100644
--- a/be/src/exec/read-write-util-test.cc
+++ b/be/src/exec/read-write-util-test.cc
@@ -83,12 +83,19 @@ TEST(ReadWriteUtil, ZeroCompressedLongRequiredBytes) {
 }
 
 void TestPutGetZeroCompressedLong(int64_t val) {
-  uint8_t buffer[9];
+  const int32_t BUFSZ = 9;
+  uint8_t buffer[BUFSZ];
   int64_t read_val;
   int64_t num_bytes = ReadWriteUtil::PutVLong(val, buffer);
-  int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val);
+  int64_t read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val, BUFSZ);
   EXPECT_EQ(read_bytes, num_bytes);
   EXPECT_EQ(read_val, val);
+  // Out of bound access check, -1 should be returned because buffer size is passed
+  // as 1 byte.
+  if (read_bytes > 1) {
+    read_bytes = ReadWriteUtil::GetVLong(buffer, &read_val, 1);
+    EXPECT_EQ(read_bytes, -1);
+  }
 }
 
 TEST(ReadWriteUtil, ZeroCompressedLong) {

http://git-wip-us.apache.org/repos/asf/impala/blob/69e88f70/be/src/exec/read-write-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/read-write-util.h b/be/src/exec/read-write-util.h
index 84d41dd..ecbebaf 100644
--- a/be/src/exec/read-write-util.h
+++ b/be/src/exec/read-write-util.h
@@ -53,11 +53,12 @@ class ReadWriteUtil {
   template<typename T>
   static T GetInt(const uint8_t* buffer);
 
-  /// Get a variable-length Long or int value from a byte buffer.
+  /// Get a variable-length Long or int value from a byte buffer of length size. Access
+  /// beyond the buffer size will return -1.
   /// Returns the length of the long/int
   /// If the size byte is corrupted then return -1;
-  static int GetVLong(uint8_t* buf, int64_t* vlong);
-  static int GetVInt(uint8_t* buf, int32_t* vint);
+  static int GetVLong(uint8_t* buf, int64_t* vlong, int32_t size);
+  static int GetVInt(uint8_t* buf, int32_t* vint, int32_t size);
 
   /// Writes a variable-length Long or int value to a byte buffer.
   /// Returns the number of bytes written.
@@ -68,8 +69,9 @@ class ReadWriteUtil {
   static int VLongRequiredBytes(int64_t val);
 
   /// Read a variable-length Long value from a byte buffer starting at the specified
-  /// byte offset.
-  static int GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong);
+  /// byte offset and the buffer passed is of length size, accessing beyond the
+  /// buffer length will result in returning -1 value to the caller.
+  static int GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong, int32_t size);
 
   /// Put an Integer into a buffer in big endian order.  The buffer must be big
   /// enough.
@@ -177,22 +179,30 @@ inline void ReadWriteUtil::PutInt(uint8_t* buf, uint64_t integer) {
   memcpy(buf, &big_endian, sizeof(uint64_t));
 }
 
-inline int ReadWriteUtil::GetVInt(uint8_t* buf, int32_t* vint) {
+inline int ReadWriteUtil::GetVInt(uint8_t* buf, int32_t* vint, int32_t size) {
   int64_t vlong = 0;
-  int len = GetVLong(buf, &vlong);
+  int len = GetVLong(buf, &vlong, size);
   *vint = static_cast<int32_t>(vlong);
   return len;
 }
 
-inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t* vlong) {
-  return GetVLong(buf, 0, vlong);
+inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t* vlong, int32_t size) {
+  return GetVLong(buf, 0, vlong, size);
 }
 
-inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong) {
+inline int ReadWriteUtil::GetVLong(
+    uint8_t* buf, int64_t offset, int64_t* vlong, int32_t size) {
+  // Buffer access out of bounds.
+  if (size == 0) return -1;
+
+  // Buffer access out of bounds.
+  if (offset > size) return -1;
   int8_t firstbyte = (int8_t) buf[0 + offset];
 
   int len = DecodeVIntSize(firstbyte);
-  if (len > MAX_VINT_LEN) return -1;
+
+  // Buffer access out of bounds.
+  if (len > MAX_VINT_LEN || len > size) return -1;
   if (len == 1) {
     *vlong = static_cast<int64_t>(firstbyte);
     return len;

http://git-wip-us.apache.org/repos/asf/impala/blob/69e88f70/be/src/exec/scanner-context.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.inline.h b/be/src/exec/scanner-context.inline.h
index f4f3bcb..19ea1f4 100644
--- a/be/src/exec/scanner-context.inline.h
+++ b/be/src/exec/scanner-context.inline.h
@@ -68,6 +68,7 @@ inline bool ScannerContext::Stream::ReadBytes(int64_t length, uint8_t** buf,
 }
 
 inline bool ScannerContext::Stream::SkipBytes(int64_t length, Status* status) {
+  DCHECK_GE(length, 0);
   int64_t bytes_left = length;
   // Skip bytes from the boundary buffer first.
   if (boundary_buffer_bytes_left_ > 0) {
@@ -102,6 +103,10 @@ inline bool ScannerContext::Stream::SkipBytes(int64_t length, Status* status) {
 inline bool ScannerContext::Stream::SkipText(Status* status) {
   int64_t len;
   RETURN_IF_FALSE(ReadVLong(&len, status));
+  if (len < 0) {
+    *status = Status("SkipText: length is negative");
+    return false;
+  }
   RETURN_IF_FALSE(SkipBytes(len, status));
   return true;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/69e88f70/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index d92dfe2..267e8f8 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -449,6 +449,12 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
     }
 
     while (uncompressed_block_len > 0) {
+      // Check that input length should not be negative.
+      if (input_len < 0) {
+        stringstream ss;
+        ss << " Corruption snappy decomp input_len " << input_len;
+        return Status(ss.str());
+      }
       // Read the length of the next snappy compressed block.
       size_t compressed_len = ReadWriteUtil::GetInt<uint32_t>(input);
       input += sizeof(uint32_t);
@@ -464,6 +470,10 @@ static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
               compressed_len, &uncompressed_len)) {
         return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
       }
+      // Check that uncompressed length should be greater than 0.
+      if (uncompressed_len <= 0) {
+        return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
+      }
       DCHECK_GT(uncompressed_len, 0);
 
       if (!size_only) {

http://git-wip-us.apache.org/repos/asf/impala/blob/69e88f70/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index 2cdb4f6..4886c4a 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -71,7 +71,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
     # TODO(IMPALA-6772): enable for ORC formats once a new version after release-1.4.3
     # of ORC library is released.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format in ('avro', 'parquet') or
+        v.get_value('table_format').file_format in ('avro', 'parquet', 'rc', 'seq') or
         (v.get_value('table_format').file_format == 'text' and
           v.get_value('table_format').compression_codec in ('none', 'lzo')))
 
@@ -90,6 +90,9 @@ class TestScannersFuzzing(ImpalaTestSuite):
       if table_format.compression_codec != 'snap' or \
           table_format.compression_type != 'block':
         pytest.skip()
+    elif table_format.file_format == 'rc' or \
+      table_format.file_format == 'seq':
+        pytest.skip()
     elif table_format.file_format == 'text' and \
         table_format.compression_codec != 'none':
       # decimal_tbl is not present for these file formats
@@ -208,8 +211,10 @@ class TestScannersFuzzing(ImpalaTestSuite):
         # (IMPALA-4013).
         table_format = vector.get_value('table_format')
         if table_format.file_format != 'parquet' \
-            and not (table_format.file_format == 'text' and
-            table_format.compression_codec != 'none'):
+            and not (table_format.file_format == 'text' and \
+            table_format.compression_codec != 'none') \
+            and not table_format.file_format == 'rc' \
+            and not table_format.file_format == 'seq':
           raise
 
   def walk_and_corrupt_table_data(self, tmp_table_dir, num_copies, rng):