You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/13 15:26:48 UTC

[5/6] incubator-impala git commit: IMPALA-5890: Abort queries if scanner hits IO errors

IMPALA-5890: Abort queries if scanner hits IO errors

Prior to this fix, an error in ScannerContext::Stream::GetNextBuffer()
could leave the stream in an inconsistent state:

- The DiskIoMgr hits EOF unexpected, cancels the scan range and enqueues
a buffer with eosr set.
- The ScannerContext::Stream tries to read more bytes, but since it has
hit eosr, it tries to read beyond the end of the scan range using
DiskIoMgr::Read().
- The previous read error resulted in a new file handle being opened.
The now truncated, smaller file causes the seek to fail.
- Then during error handling, the BaseSequenceScanner calls SkipToSync()
and trips over the NULL pointer in in the IO buffer.

In my reproduction this only happens with the file handle cache enabled,
which causes Impala to see two different sized handles: the one from the
cache when the query starts, and the one after reopening the file.

To fix this, we change the I/O manager to always return DISK_IO_ERROR
for errors and we abort a query if we receive such an error in the
scanner.

This change also fixes GetBytesInternal() to maintain the invariant that
the output buffer points to the boundary buffer whenever the latter
contains some data.

I tested this by running the repro from the JIRA and impalad did not
crash but aborted the queries. I also ran the repro with
abort_on_error=1, and with the file handle cache disabled.

Text files are not affected by this problem, since the
text scanner doesn't try to recover from errors during ProcessRange()
but wraps it in RETURN_IF_ERROR instead. With this change queries abort
with the same error.

Parquet files are also not affected since they have the metadata at the
end. Truncated files immediately fail with this error:
WARNINGS: File 'hdfs://localhost:20500/test-warehouse/tpch.partsupp_parquet/foo.0.parq'
has an invalid version number: <UTF8 Garbage>

Change-Id: I44dc95184c241fbcdbdbebad54339530680d3509
Reviewed-on: http://gerrit.cloudera.org:8080/8011
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/322e2dc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/322e2dc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/322e2dc8

Branch: refs/heads/master
Commit: 322e2dc80259cfa712fc6d0d224d2c2c16a6708d
Parents: caa382c
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Sep 5 14:57:57 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 13 09:20:53 2017 +0000

----------------------------------------------------------------------
 be/src/common/status.h                   | 10 +++++++
 be/src/exec/base-sequence-scanner.cc     | 21 ++++++-------
 be/src/exec/scanner-context.cc           | 36 ++++++++++++++++++++--
 be/src/exec/scanner-context.h            |  7 +++--
 be/src/runtime/disk-io-mgr-scan-range.cc | 43 +++++++++++----------------
 be/src/runtime/disk-io-mgr-test.cc       |  6 ++--
 be/src/runtime/disk-io-mgr.cc            | 28 +++++++++--------
 be/src/runtime/runtime-state.cc          |  4 ++-
 common/thrift/generate_error_codes.py    |  4 +++
 9 files changed, 104 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index cf7481b..0f057e7 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -201,11 +201,21 @@ class NODISCARD Status {
         && msg_->error() == TErrorCode::MEM_LIMIT_EXCEEDED;
   }
 
+  bool IsInternalError() const {
+    return msg_ != NULL
+        && msg_->error() == TErrorCode::INTERNAL_ERROR;
+  }
+
   bool IsRecoverableError() const {
     return msg_ != NULL
         && msg_->error() == TErrorCode::RECOVERABLE_ERROR;
   }
 
+  bool IsDiskIoError() const {
+    return msg_ != NULL
+        && msg_->error() == TErrorCode::DISK_IO_ERROR;
+  }
+
   /// Returns the error message associated with a non-successful status.
   const ErrorMsg& msg() const {
     DCHECK(msg_ != NULL);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index a522a1f..da67bb5 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -174,18 +174,19 @@ Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) {
 
   Status status = ProcessRange(row_batch);
   if (!status.ok()) {
-    if (status.IsCancelled() || status.IsMemLimitExceeded()) return status;
-
     // Log error from file format parsing.
-    state_->LogError(ErrorMsg(TErrorCode::SEQUENCE_SCANNER_PARSE_ERROR,
-        stream_->filename(), stream_->file_offset(),
-        (stream_->eof() ? "(EOF)" : "")));
-
-    // Make sure errors specified in the status are logged as well
-    state_->LogError(status.msg());
+    // TODO(IMPALA-5922): Include the file and offset in errors inside the scanners.
+    if (!status.IsCancelled() &&
+        !status.IsMemLimitExceeded() &&
+        !status.IsInternalError() &&
+        !status.IsDiskIoError()) {
+      state_->LogError(ErrorMsg(TErrorCode::SEQUENCE_SCANNER_PARSE_ERROR,
+          stream_->filename(), stream_->file_offset(),
+          (stream_->eof() ? "(EOF)" : "")));
+    }
 
-    // If abort on error then return, otherwise try to recover.
-    if (state_->abort_on_error()) return status;
+    // This checks for abort_on_error.
+    RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
 
     // Recover by skipping to the next sync.
     parse_status_ = Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index b1577a1..16a09e4 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -151,7 +151,9 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
 
   if (!eosr) {
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
-    RETURN_IF_ERROR(scan_range_->GetNext(&io_buffer_));
+    Status status = scan_range_->GetNext(&io_buffer_);
+    DCHECK(!status.ok() || io_buffer_ != NULL);
+    RETURN_IF_ERROR(status);
   } else {
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
 
@@ -184,6 +186,15 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
   }
 
   DCHECK(io_buffer_ != NULL);
+  if (UNLIKELY(io_buffer_ == NULL)) {
+    // This has bitten us before, so we defend against NULL in release builds here. It
+    // indicates an error in the IoMgr, which did not return a valid buffer.
+    // TODO(IMPALA-5914): Remove this check once we're confident we're not hitting it.
+    return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: "
+        "Failed to receive buffer from scan range for file $0 at offset $1",
+        filename(), offset));
+  }
+
   parent_->scan_node_->num_owned_io_buffers_.Add(1);
   io_buffer_pos_ = reinterpret_cast<uint8_t*>(io_buffer_->buffer());
   io_buffer_bytes_left_ = io_buffer_->len();
@@ -207,7 +218,7 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
   }
 
   if (boundary_buffer_bytes_left_ > 0) {
-    DCHECK_EQ(output_buffer_pos_, &boundary_buffer_pos_);
+    DCHECK(ValidateBufferPointers());
     DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
     *out_buffer = boundary_buffer_pos_;
     // Don't return more bytes past eosr
@@ -225,6 +236,9 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
     // We're at the end of the boundary buffer and the current IO buffer. Get a new IO
     // buffer and set the current buffer to it.
     RETURN_IF_ERROR(GetNextBuffer());
+    // Check that we're not pointing to the IO buffer if there are bytes left in the
+    // boundary buffer.
+    DCHECK_EQ(boundary_buffer_bytes_left_, 0);
     output_buffer_pos_ = &io_buffer_pos_;
     output_buffer_bytes_left_ = &io_buffer_bytes_left_;
   }
@@ -238,6 +252,7 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
     total_bytes_returned_ += *len;
   }
   DCHECK_GE(bytes_left(), 0);
+  DCHECK(ValidateBufferPointers());
   return Status::OK();
 }
 
@@ -254,6 +269,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
     }
   }
 
+  DCHECK(ValidateBufferPointers());
+
   while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
     // We must copy the remainder of 'io_buffer_' to 'boundary_buffer_' before advancing
     // to handle the case when the read straddles a block boundary. Preallocate
@@ -262,6 +279,11 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
       RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
       RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_));
       boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
+
+      // Make state consistent in case we return early with an error below.
+      io_buffer_bytes_left_ = 0;
+      output_buffer_pos_ = &boundary_buffer_pos_;
+      output_buffer_bytes_left_ = &boundary_buffer_bytes_left_;
     }
 
     int64_t remaining_requested_len = requested_len - boundary_buffer_->len();
@@ -306,6 +328,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
     }
   }
 
+
+  DCHECK(ValidateBufferPointers());
   return Status::OK();
 }
 
@@ -314,6 +338,14 @@ bool ScannerContext::cancelled() const {
   return static_cast<HdfsScanNode*>(scan_node_)->done();
 }
 
+bool ScannerContext::Stream::ValidateBufferPointers() const {
+  // If there are bytes left in the boundary buffer, the output buffer pointers must point
+  // to it.
+  return boundary_buffer_bytes_left_ == 0 ||
+      (output_buffer_pos_ == &boundary_buffer_pos_ &&
+      output_buffer_bytes_left_ == &boundary_buffer_bytes_left_);
+}
+
 Status ScannerContext::Stream::ReportIncompleteRead(int64_t length, int64_t bytes_read) {
   return Status(TErrorCode::SCANNER_INCOMPLETE_READ, length, bytes_read,
       filename(), file_offset());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 470ec01..bd5623c 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -87,7 +87,7 @@ class ScannerContext {
     ///    If the requested buffer straddles io buffers, a copy is done here.
     ///  - *out_len is the number of bytes returned.
     ///  - *status is set if there is an error.
-    /// Returns true if the call was success (i.e. status->ok())
+    /// Returns true if the call was successful (i.e. status->ok())
     /// This should only be called from the scanner thread.
     /// Note that this will return bytes past the end of the scan range until the end of
     /// the file.
@@ -264,6 +264,9 @@ class ScannerContext {
     /// resources are also freed.
     void ReleaseCompletedResources(RowBatch* batch, bool done);
 
+    /// Validates that the output buffer pointers point to the correct buffer.
+    bool ValidateBufferPointers() const;
+
     /// Error-reporting functions.
     Status ReportIncompleteRead(int64_t length, int64_t bytes_read);
     Status ReportInvalidRead(int64_t length);
@@ -301,7 +304,7 @@ class ScannerContext {
   /// The stream is created in the runtime state's object pool
   Stream* AddStream(DiskIoMgr::ScanRange* range);
 
-  /// Returns false it scan_node_ is multi-threaded and has been cancelled.
+  /// Returns false if scan_node_ is multi-threaded and has been cancelled.
   /// Always returns false if the scan_node_ is not multi-threaded.
   bool cancelled() const;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index df74fcc..8ed5138 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -291,14 +291,16 @@ Status DiskIoMgr::ScanRange::Open(bool use_file_handle_cache) {
     exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
         mtime(), reader_, true);
     if (exclusive_hdfs_fh_ == nullptr) {
-      return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_));
+      return Status(TErrorCode::DISK_IO_ERROR,
+          GetHdfsErrorMsg("Failed to open HDFS file ", file_));
     }
 
     if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) != 0) {
       // Destroy the file handle and remove it from the cache.
       io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true);
       exclusive_hdfs_fh_ = nullptr;
-      return Status(Substitute("Error seeking to $0 in file: $1 $2", offset_, file_,
+      return Status(TErrorCode::DISK_IO_ERROR,
+          Substitute("Error seeking to $0 in file: $1 $2", offset_, file_,
           GetHdfsErrorMsg("")));
     }
   } else {
@@ -306,19 +308,14 @@ Status DiskIoMgr::ScanRange::Open(bool use_file_handle_cache) {
 
     local_file_ = fopen(file(), "r");
     if (local_file_ == nullptr) {
-      string error_msg = GetStrErrMsg();
-      stringstream ss;
-      ss << "Could not open file: " << file_ << ": " << error_msg;
-      return Status(ss.str());
+      return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not open file: $0: $1",
+            file_, GetStrErrMsg()));
     }
     if (fseek(local_file_, offset_, SEEK_SET) == -1) {
       fclose(local_file_);
       local_file_ = nullptr;
-      string error_msg = GetStrErrMsg();
-      stringstream ss;
-      ss << "Could not seek to " << offset_ << " for file: " << file_
-         << ": " << error_msg;
-      return Status(ss.str());
+      return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not seek to $0 "
+          "for file: $1: $2", offset_, file_, GetStrErrMsg()));
     }
   }
   if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
@@ -425,7 +422,8 @@ Status DiskIoMgr::ScanRange::Read(
       borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
           mtime(), reader_, false);
       if (borrowed_hdfs_fh == nullptr) {
-        return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_));
+        return Status(TErrorCode::DISK_IO_ERROR,
+            GetHdfsErrorMsg("Failed to open HDFS file ", file_));
       }
       hdfs_file = borrowed_hdfs_fh->file();
     }
@@ -450,7 +448,8 @@ Status DiskIoMgr::ScanRange::Read(
           current_bytes_read = hdfsPread(fs_, hdfs_file, position_in_file,
               buffer + *bytes_read, chunk_size);
           if (current_bytes_read == -1) {
-            status = Status(GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
+            status = Status(TErrorCode::DISK_IO_ERROR,
+                GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
           }
         } else {
           // If the file handle is borrowed, it may not be at the appropriate
@@ -458,11 +457,8 @@ Status DiskIoMgr::ScanRange::Read(
           bool seek_failed = false;
           if (borrowed_hdfs_fh != nullptr) {
             if (hdfsSeek(fs_, hdfs_file, position_in_file) != 0) {
-              string error_msg = GetHdfsErrorMsg("");
-              stringstream ss;
-              ss << "Error seeking to " << position_in_file << " in file: "
-                 << file_ << " " << error_msg;
-              status = Status(ss.str());
+              status = Status(TErrorCode::DISK_IO_ERROR, Substitute("Error seeking to $0 "
+                  " in file: $1: $2", position_in_file, file_, GetHdfsErrorMsg("")));
               seek_failed = true;
             }
           }
@@ -470,7 +466,8 @@ Status DiskIoMgr::ScanRange::Read(
             current_bytes_read = hdfsRead(fs_, hdfs_file, buffer + *bytes_read,
                 chunk_size);
             if (current_bytes_read == -1) {
-              status = Status(GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
+              status = Status(TErrorCode::DISK_IO_ERROR,
+                  GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
             }
           }
         }
@@ -513,11 +510,8 @@ Status DiskIoMgr::ScanRange::Read(
     DCHECK_LE(*bytes_read, bytes_to_read);
     if (*bytes_read < bytes_to_read) {
       if (ferror(local_file_) != 0) {
-        string error_msg = GetStrErrMsg();
-        stringstream ss;
-        ss << "Error reading from " << file_ << " at byte offset: "
-           << (offset_ + bytes_read_) << ": " << error_msg;
-        return Status(ss.str());
+        return Status(TErrorCode::DISK_IO_ERROR, Substitute("Error reading from $0"
+            "at byte offset: $1: $2", file_, offset_ + bytes_read_, GetStrErrMsg()));
       } else {
         // On Linux, we should only get partial reads from block devices on error or eof.
         DCHECK(feof(local_file_) != 0);
@@ -569,7 +563,6 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
   // TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish
   // between errors and partially cached blocks here.
   if (bytes_read < len()) {
-    stringstream ss;
     VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected "
       << len() << " bytes, but read " << bytes_read << ". Switching to disk read path.";
     // Close the scan range. 'read_succeeded' is still false, so the caller will fall back

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 05c99e7..7c60efa 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -61,7 +61,7 @@ class DiskIoMgrTest : public testing::Test {
     if (expected_status.code() == TErrorCode::CANCELLED) {
       EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail();
     } else {
-      EXPECT_TRUE(status.code() == expected_status.code());
+      EXPECT_EQ(status.code(), expected_status.code());
     }
     if (status.ok()) {
       DiskIoMgr::ScanRange* scan_range = pool_->Add(new DiskIoMgr::ScanRange());
@@ -261,7 +261,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   DiskIoMgr::WriteRange::WriteDoneCallback callback =
       bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
           (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data,
-          Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
+          Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
 
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
@@ -278,7 +278,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   new_range = pool_->Add(new DiskIoMgr::WriteRange*);
   callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
       new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
-      data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
+      data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
 
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 0fe16d2..e77d9ca 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -541,13 +541,16 @@ int64_t DiskIoMgr::GetReadThroughput() {
 Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
   int disk_id = range->disk_id_;
   if (disk_id < 0 || disk_id >= disk_queues_.size()) {
-    return Status(Substitute("Invalid scan range.  Bad disk id: $0", disk_id));
+    return Status(TErrorCode::DISK_IO_ERROR,
+        Substitute("Invalid scan range.  Bad disk id: $0", disk_id));
   }
   if (range->offset_ < 0) {
-    return Status(Substitute("Invalid scan range. Negative offset $0", range->offset_));
+    return Status(TErrorCode::DISK_IO_ERROR,
+        Substitute("Invalid scan range. Negative offset $0", range->offset_));
   }
   if (range->len_ < 0) {
-    return Status(Substitute("Invalid scan range. Negative length $0", range->len_));
+    return Status(TErrorCode::DISK_IO_ERROR,
+        Substitute("Invalid scan range. Negative length $0", range->len_));
   }
   return Status::OK();
 }
@@ -665,9 +668,9 @@ Status DiskIoMgr::Read(DiskIoRequestContext* reader,
 
   if (range->len() > max_buffer_size_
       && range->external_buffer_tag_ != ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
-    return Status(Substitute("Internal error: cannot perform sync read of '$0' bytes "
-                   "that is larger than the max read buffer size '$1'.",
-            range->len(), max_buffer_size_));
+    return Status(TErrorCode::DISK_IO_ERROR, Substitute("Internal error: cannot "
+        "perform sync read of '$0' bytes that is larger than the max read buffer size "
+        "'$1'.", range->len(), max_buffer_size_));
   }
 
   vector<DiskIoMgr::ScanRange*> ranges;
@@ -1164,13 +1167,13 @@ void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_ra
   // Raw open() syscall will create file if not present when passed these flags.
   int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
   if (fd < 0) {
-    ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+    ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
         Substitute("Opening '$0' for write failed with errno=$1 description=$2",
                                      write_range->file_, errno, GetStrErrMsg())));
   } else {
     file_handle = fdopen(fd, "wb");
     if (file_handle == nullptr) {
-      ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+      ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
           Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", fd, errno,
                                        GetStrErrMsg())));
     }
@@ -1181,7 +1184,7 @@ void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_ra
 
     int success = fclose(file_handle);
     if (ret_status.ok() && success != 0) {
-      ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+      ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
           Substitute("fclose($0) failed", write_range->file_)));
     }
   }
@@ -1193,7 +1196,7 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
   // Seek to the correct offset and perform the write.
   int success = fseek(file_handle, write_range->offset(), SEEK_SET);
   if (success != 0) {
-    return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+    return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
         Substitute("fseek($0, $1, SEEK_SET) failed with errno=$2 description=$3",
         write_range->file_, write_range->offset(), errno, GetStrErrMsg())));
   }
@@ -1205,7 +1208,7 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
 #endif
   int64_t bytes_written = fwrite(write_range->data_, 1, write_range->len_, file_handle);
   if (bytes_written < write_range->len_) {
-    return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+    return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
         Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 description=$3",
         write_range->len_, write_range->file_, errno, GetStrErrMsg())));
   }
@@ -1290,7 +1293,8 @@ Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fnam
   *fid = file_handle_cache_.GetFileHandle(fs, fname, mtime, true,
       &cache_hit);
   if (*fid == nullptr) {
-    return Status(GetHdfsErrorMsg("Failed to open HDFS file ", fname->data()));
+    return Status(TErrorCode::DISK_IO_ERROR,
+        GetHdfsErrorMsg("Failed to open HDFS file ", fname->data()));
   }
   DCHECK(!cache_hit);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index fd1c061..8f48439 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -179,8 +179,10 @@ Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
   // If either abort_on_error=true or the error necessitates execution stops
   // immediately, return an error status.
   if (abort_on_error() ||
+      message.error() == TErrorCode::CANCELLED ||
       message.error() == TErrorCode::MEM_LIMIT_EXCEEDED ||
-      message.error() == TErrorCode::CANCELLED) {
+      message.error() == TErrorCode::INTERNAL_ERROR ||
+      message.error() == TErrorCode::DISK_IO_ERROR) {
     return Status(message);
   }
   // Otherwise, add the error to the error log and continue.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 3c37bef..ad07963 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -333,6 +333,10 @@ error_codes = (
      "Queued reason: $2"),
 
   ("THREAD_CREATION_FAILED", 109, "Failed to create thread $0 in category $1: $2"),
+
+  ("DISK_IO_ERROR", 110, "Disk I/O error: $0"),
+
+
 )
 
 import sys