You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/21 22:39:45 UTC

[1/4] impala git commit: IMPALA-6669: Remove Parquet NeedsSeedingForBatchedReading()

Repository: impala
Updated Branches:
  refs/heads/master 644af57b5 -> 2c1fbecc9


IMPALA-6669: Remove Parquet NeedsSeedingForBatchedReading()

I noticed that we could remove this part of the interface and instead
do the "seeding" in ParquetColumnReader::Read*ValueBatch(). It should
be easier to understand with level reading and consumption happening
driven by the same function instead of split between files.

Testing:
Ran core tests. This code path should be thoroughly exercised by
the regular scanner tests.

Change-Id: I98f65d0d72e86b1e3db1f3543a03873afb9da062
Reviewed-on: http://gerrit.cloudera.org:8080/9636
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8db3fb2c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8db3fb2c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8db3fb2c

Branch: refs/heads/master
Commit: 8db3fb2cc7107968c7b3d9c36746d1868efc8407
Parents: 644af57
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Mar 14 11:01:05 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Mar 21 20:14:02 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc   | 29 ++-----------------
 be/src/exec/hdfs-parquet-scanner.h    |  2 +-
 be/src/exec/parquet-column-readers.cc | 14 ++++++++--
 be/src/exec/parquet-column-readers.h  | 45 +++++++++++++++---------------
 4 files changed, 37 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8db3fb2c/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e279369..0d79f53 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -713,33 +713,8 @@ Status HdfsParquetScanner::NextRowGroup() {
       ReleaseSkippedRowGroupResources();
       continue;
     }
-
-    bool seeding_failed = false;
-    for (ParquetColumnReader* col_reader: column_readers_) {
-      // Seed collection and boolean column readers with NextLevel().
-      // The ScalarColumnReaders use an optimized ReadValueBatch() that
-      // should not be seeded.
-      // TODO: Refactor the column readers to look more like the optimized
-      // ScalarColumnReader::ReadValueBatch() which does not need seeding. This
-      // will allow better sharing of code between the row-wise and column-wise
-      // materialization strategies.
-      if (col_reader->NeedsSeedingForBatchedReading()
-          && !col_reader->NextLevels()) {
-        seeding_failed = true;
-        break;
-      }
-    }
-    if (seeding_failed) {
-      if (!parse_status_.ok()) {
-        RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
-      }
-      ReleaseSkippedRowGroupResources();
-      continue;
-    } else {
-      // Seeding succeeded - we're ready to read the row group.
-      DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
-      break;
-    }
+    DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
+    break;
   }
 
   DCHECK(parse_status_.ok());

http://git-wip-us.apache.org/repos/asf/impala/blob/8db3fb2c/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index f0043b5..ccb109c 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -344,7 +344,7 @@ class HdfsParquetScanner : public HdfsScanner {
       llvm::Function** process_scratch_batch_fn)
       WARN_UNUSED_RESULT;
 
-  /// The repetition level is set to this value to indicate the end of a row group.
+  /// The rep and def levels are set to this value to indicate the end of a row group.
   static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
   /// Indicates an invalid definition or repetition level.
   static const int16_t INVALID_LEVEL = -1;

http://git-wip-us.apache.org/repos/asf/impala/blob/8db3fb2c/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 61839af..7647c73 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -261,8 +261,6 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     return ReadValue<false>(pool, tuple);
   }
 
-  virtual bool NeedsSeedingForBatchedReading() const { return false; }
-
   virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
       uint8_t* tuple_mem, int* num_values) {
     return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values);
@@ -777,6 +775,11 @@ bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
 
 bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
     int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  // The below loop requires that NextLevels() was called previously to populate
+  // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
+  // row group.
+  if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return false;
+
   int val_count = 0;
   bool continue_execution = true;
   while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
@@ -800,6 +803,11 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
 
 bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
     int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  // The below loop requires that NextLevels() was called previously to populate
+  // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
+  // row group.
+  if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return false;
+
   int val_count = 0;
   bool continue_execution = true;
   while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
@@ -1214,7 +1222,7 @@ bool BaseScalarColumnReader::NextPage() {
   if (UNLIKELY(!parent_->parse_status_.ok())) return false;
   if (num_buffered_values_ == 0) {
     rep_level_ = HdfsParquetScanner::ROW_GROUP_END;
-    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
+    def_level_ = HdfsParquetScanner::ROW_GROUP_END;
     pos_current_value_ = HdfsParquetScanner::INVALID_POS;
     return false;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/8db3fb2c/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index fbc98ec..8724f43 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -201,18 +201,14 @@ class ParquetColumnReader {
   /// not in collections.
   virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
 
-  /// Returns true if this reader needs to be seeded with NextLevels() before
-  /// calling ReadValueBatch() or ReadNonRepeatedValueBatch().
-  /// Note that all readers need to be seeded before calling the non-batched ReadValue().
-  virtual bool NeedsSeedingForBatchedReading() const { return true; }
-
   /// Batched version of ReadValue() that reads up to max_values at once and materializes
   /// them into tuples in tuple_mem. Returns the number of values actually materialized
   /// in *num_values. The return value, error behavior and state changes are generally
   /// the same as in ReadValue(). For example, if an error occurs in the middle of
   /// materializing a batch then false is returned, and num_values, tuple_mem, as well as
   /// this column reader are left in an undefined state, assuming that the caller will
-  /// immediately abort execution.
+  /// immediately abort execution. NextLevels() does *not* need to be called before
+  /// ReadValueBatch(), unlike ReadValue().
   virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
       uint8_t* tuple_mem, int* num_values);
 
@@ -241,7 +237,11 @@ class ParquetColumnReader {
   void ReadPosition(Tuple* tuple);
 
   /// Returns true if this column reader has reached the end of the row group.
-  inline bool RowGroupAtEnd() { return rep_level_ == HdfsParquetScanner::ROW_GROUP_END; }
+  inline bool RowGroupAtEnd() {
+    DCHECK_EQ(rep_level_ == HdfsParquetScanner::ROW_GROUP_END,
+              def_level_ == HdfsParquetScanner::ROW_GROUP_END);
+    return rep_level_ == HdfsParquetScanner::ROW_GROUP_END;
+  }
 
   /// If 'row_batch' is non-NULL, transfers the remaining resources backing tuples to it,
   /// and frees up other resources. If 'row_batch' is NULL frees all resources instead.
@@ -257,18 +257,19 @@ class ParquetColumnReader {
   const SlotDescriptor* pos_slot_desc_;
 
   /// The next value to write into the position slot, if there is one. 64-bit int because
-  /// the pos slot is always a BIGINT Set to -1 when this column reader does not have a
-  /// current rep and def level (i.e. before the first NextLevels() call or after the last
-  /// value in the column has been read).
+  /// the pos slot is always a BIGINT Set to INVALID_POS when this column reader does not
+  /// have a current rep and def level (i.e. before the first NextLevels() call or after
+  /// the last value in the column has been read).
   int64_t pos_current_value_;
 
   /// The current repetition and definition levels of this reader. Advanced via
-  /// ReadValue() and NextLevels(). Set to -1 when this column reader does not have a
-  /// current rep and def level (i.e. before the first NextLevels() call or after the last
-  /// value in the column has been read). If this is not inside a collection, rep_level_ is
-  /// always 0.
-  /// int16_t is large enough to hold the valid levels 0-255 and sentinel value -1.
-  /// The maximum values are cached here because they are accessed in inner loops.
+  /// ReadValue() and NextLevels(). Set to INVALID_LEVEL before the first NextLevels()
+  /// call for a row group or if an error is encountered decoding a level. Set to
+  /// ROW_GROUP_END after the last value in the column has been read). If this is not
+  /// inside a collection, rep_level_ is always 0, INVALID_LEVEL or ROW_GROUP_END.
+  /// int16_t is large enough to hold the valid levels 0-255 and negative sentinel values
+  /// INVALID_LEVEL and ROW_GROUP_END. The maximum values are cached here because they
+  /// are accessed in inner loops.
   int16_t rep_level_;
   int16_t max_rep_level_;
   int16_t def_level_;
@@ -348,10 +349,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
     stream_ = stream;
     metadata_ = metadata;
     num_values_read_ = 0;
-    def_level_ = -1;
+    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
     // See ColumnReader constructor.
-    rep_level_ = max_rep_level() == 0 ? 0 : -1;
-    pos_current_value_ = -1;
+    rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL;
+    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
 
     if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
       RETURN_IF_ERROR(Codec::CreateDecompressor(
@@ -547,9 +548,9 @@ class CollectionColumnReader : public ParquetColumnReader {
 
   /// This is called once for each row group in the file.
   void Reset() {
-    def_level_ = -1;
-    rep_level_ = -1;
-    pos_current_value_ = -1;
+    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
+    rep_level_ = HdfsParquetScanner::INVALID_LEVEL;
+    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
   }
 
   virtual void Close(RowBatch* row_batch) {


[3/4] impala git commit: [DOCS] Fixed a typo at line #120

Posted by ta...@apache.org.
[DOCS] Fixed a typo at line #120

Change-Id: I3f726889071950bc7025079ba9be90fd5d71bc9c
Reviewed-on: http://gerrit.cloudera.org:8080/9742
Reviewed-by: John Russell <jr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d9726148
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d9726148
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d9726148

Branch: refs/heads/master
Commit: d972614881e064335fd6df0ca9bbc9220c0cb8f7
Parents: 595421e
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Wed Mar 21 13:22:07 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Mar 21 20:34:22 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_proxy.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d9726148/docs/topics/impala_proxy.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_proxy.xml b/docs/topics/impala_proxy.xml
index 93c3032..653f7f4 100644
--- a/docs/topics/impala_proxy.xml
+++ b/docs/topics/impala_proxy.xml
@@ -118,7 +118,7 @@ under the License.
                 long-running queries. Evaluate whether this setting is
                 appropriate for your combination of workload and client
                 applications. See <xref href="#proxy_balancing" format="dita"/>
-                for
+                for load balancing algorithm options.
               </p>
             </li>
             <li>


[4/4] impala git commit: IMPALA-2782: Allow impala-shell to connect directly to impalad when configured with load balancer and kerberos.

Posted by ta...@apache.org.
IMPALA-2782: Allow impala-shell to connect directly to impalad when
configured with load balancer and kerberos.

This change adds an impala-shell option -b / --kerberos_host_fqdn.
This allows user to optionally specify the load-balancer's host so
that impala-shell will accept a direct connection to impala daemons
in a kerberized cluster.

Change-Id: I4726226a7a3817421b133f74dd4f4cf8c52135f9
Reviewed-on: http://gerrit.cloudera.org:8080/7241
Reviewed-by: <an...@phdata.io>
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2c1fbecc
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2c1fbecc
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2c1fbecc

Branch: refs/heads/master
Commit: 2c1fbecc9f1302efab597c827976a944927af649
Parents: d972614
Author: Vincent Tran <vt...@cloudera.com>
Authored: Tue Jun 20 22:09:17 2017 -0400
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Mar 21 20:45:48 2018 +0000

----------------------------------------------------------------------
 shell/impala_client.py                | 18 ++++++++++++++----
 shell/impala_shell.py                 |  3 ++-
 shell/impala_shell_config_defaults.py |  1 +
 shell/option_parser.py                |  7 +++++++
 4 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2c1fbecc/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 795768c..d4bfbee 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -59,11 +59,12 @@ class QueryCancelledByShellException(Exception): pass
 
 class ImpalaClient(object):
 
-  def __init__(self, impalad, use_kerberos=False, kerberos_service_name="impala",
-               use_ssl=False, ca_cert=None, user=None, ldap_password=None,
-               use_ldap=False):
+  def __init__(self, impalad, kerberos_host_fqdn, use_kerberos=False,
+               kerberos_service_name="impala", use_ssl=False, ca_cert=None, user=None,
+               ldap_password=None, use_ldap=False):
     self.connected = False
     self.impalad = impalad
+    self.kerberos_host_fqdn = kerberos_host_fqdn
     self.imp_service = None
     self.transport = None
     self.use_kerberos = use_kerberos
@@ -275,7 +276,16 @@ class ImpalaClient(object):
       from TSSLSocketWithWildcardSAN import TSSLSocketWithWildcardSAN
 
     # sasl does not accept unicode strings, explicitly encode the string into ascii.
-    host, port = self.impalad[0].encode('ascii', 'ignore'), int(self.impalad[1])
+    # The kerberos_host_fqdn option exposes the SASL client's hostname attribute to
+    # the user. impala-shell checks to ensure this host matches the host in the kerberos
+    # principal. So in the presence of a load balancer, the its hostname is expected by
+    # impala-shell. Setting this option to the load balancer hostname allows impala-shell to
+    # connect directly to an impalad.
+    if self.kerberos_host_fqdn is not None:
+      host, port = (self.kerberos_host_fqdn.split(':')[0].encode('ascii', 'ignore'),
+            int(self.impalad[1]))
+    else:
+      host, port = self.impalad[0].encode('ascii', 'ignore'), int(self.impalad[1])
     if self.use_ssl:
       if self.ca_cert is None:
         # No CA cert means don't try to verify the certificate

http://git-wip-us.apache.org/repos/asf/impala/blob/2c1fbecc/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 4e57677..43620a8 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -139,6 +139,7 @@ class ImpalaShell(object, cmd.Cmd):
     self.is_alive = True
 
     self.impalad = None
+    self.kerberos_host_fqdn = options.kerberos_host_fqdn
     self.use_kerberos = options.use_kerberos
     self.kerberos_service_name = options.kerberos_service_name
     self.use_ssl = options.ssl
@@ -485,7 +486,7 @@ class ImpalaShell(object, cmd.Cmd):
     return completed_cmd
 
   def _new_impala_client(self):
-    return ImpalaClient(self.impalad, self.use_kerberos,
+    return ImpalaClient(self.impalad, self.kerberos_host_fqdn, self.use_kerberos,
                         self.kerberos_service_name, self.use_ssl,
                         self.ca_cert, self.user, self.ldap_password,
                         self.use_ldap)

http://git-wip-us.apache.org/repos/asf/impala/blob/2c1fbecc/shell/impala_shell_config_defaults.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell_config_defaults.py b/shell/impala_shell_config_defaults.py
index c951477..6abf95e 100644
--- a/shell/impala_shell_config_defaults.py
+++ b/shell/impala_shell_config_defaults.py
@@ -30,6 +30,7 @@ impala_shell_defaults = {
             'history_max': 1000,
             'ignore_query_failure': False,
             'impalad': socket.getfqdn() + ':21000',
+            'kerberos_host_fqdn': None,
             'kerberos_service_name': 'impala',
             'output_delimiter': '\\t',
             'output_file': None,

http://git-wip-us.apache.org/repos/asf/impala/blob/2c1fbecc/shell/option_parser.py
----------------------------------------------------------------------
diff --git a/shell/option_parser.py b/shell/option_parser.py
index f39e0ce..3c8ae08 100755
--- a/shell/option_parser.py
+++ b/shell/option_parser.py
@@ -125,6 +125,13 @@ def get_option_parser(defaults):
 
   parser.add_option("-i", "--impalad", dest="impalad",
                     help="<host:port> of impalad to connect to \t\t")
+  parser.add_option("-b", "--kerberos_host_fqdn", dest="kerberos_host_fqdn",
+                    help="If set, overrides the expected hostname of the Impalad's "
+                         "kerberos service principal. impala-shell will check that "
+                         "the server's principal matches this hostname. This may be "
+                         "used when impalad is configured to be accessed via a "
+                         "load-balancer, but it is desired for impala-shell to talk "
+                         "to a specific impalad directly.")
   parser.add_option("-q", "--query", dest="query",
                     help="Execute a query without the shell")
   parser.add_option("-f", "--query_file", dest="query_file",


[2/4] impala git commit: IMPALA-3866: Improve error reporting for scratch write errors

Posted by ta...@apache.org.
IMPALA-3866: Improve error reporting for scratch write errors

The error messages coming from DiskIoMgr::Write() are enhanced by this
change. A mapping is introduced between the errno set by open(),
fdopen(), fseek(), fwrite() or fclose() low level functions and an
error message for displaying purposes. If any of these functions
fail then the returned error message is taken from this mapping.

In addition there were two functions, NewFile() and
FileAllocateSpace() that always returned Status::OK(). I made them
void and removed the status checks from the call sites.

For testing purposes a fault injection mechanism is introduced to
simulate the cases when the above mentioned functions fail.

Change-Id: I5aa7b424209b1a5ef8dc7d04c5ba58788e91aad7
Reviewed-on: http://gerrit.cloudera.org:8080/9420
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/595421e7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/595421e7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/595421e7

Branch: refs/heads/master
Commit: 595421e77d17eeeacaf15a5e4d7314c7026958c1
Parents: 8db3fb2
Author: Gabor Kaszab <ga...@cloudera.com>
Authored: Fri Feb 2 08:23:45 2018 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Mar 21 20:31:36 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/io/CMakeLists.txt                |   3 +
 be/src/runtime/io/disk-io-mgr-test.cc           | 116 +++++++++++++++++--
 be/src/runtime/io/disk-io-mgr.cc                |  49 +++-----
 be/src/runtime/io/disk-io-mgr.h                 |  13 ++-
 be/src/runtime/io/error-converter.cc            |  83 +++++++++++++
 be/src/runtime/io/error-converter.h             |  70 +++++++++++
 .../local-file-system-with-fault-injection.cc   |  67 +++++++++++
 .../io/local-file-system-with-fault-injection.h |  72 ++++++++++++
 be/src/runtime/io/local-file-system.cc          | 101 ++++++++++++++++
 be/src/runtime/io/local-file-system.h           |  56 +++++++++
 be/src/runtime/io/request-ranges.h              |   1 +
 be/src/runtime/tmp-file-mgr-internal.h          |   8 +-
 be/src/runtime/tmp-file-mgr-test.cc             |  22 ++--
 be/src/runtime/tmp-file-mgr.cc                  |  35 ++----
 be/src/runtime/tmp-file-mgr.h                   |   4 +-
 be/src/util/error-util.cc                       |   8 +-
 be/src/util/error-util.h                        |   4 +
 17 files changed, 624 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
index ae89509..226fdc6 100644
--- a/be/src/runtime/io/CMakeLists.txt
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -24,6 +24,9 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime/io")
 add_library(Io
   disk-io-mgr.cc
   disk-io-mgr-stress.cc
+  local-file-system.cc
+  local-file-system-with-fault-injection.cc
+  error-converter.cc
   request-context.cc
   scan-range.cc
 )

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index b03ec31..a16d06e 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -22,6 +22,7 @@
 
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
+#include "runtime/io/local-file-system-with-fault-injection.h"
 #include "runtime/io/request-context.h"
 #include "runtime/io/disk-io-mgr-stress.h"
 #include "runtime/io/disk-io-mgr.h"
@@ -70,6 +71,10 @@ class DiskIoMgrTest : public testing::Test {
           sizeof(int32_t));
     }
 
+    if (!status.ok()) {
+      EXPECT_EQ(expected_status.GetDetail(), status.GetDetail());
+    }
+
     {
       lock_guard<mutex> l(written_mutex_);
       ++num_ranges_written_;
@@ -177,6 +182,20 @@ class DiskIoMgrTest : public testing::Test {
     return range;
   }
 
+  // Function to trigger Write() on DiskIoMgr with the purpose of deliberately making it
+  // fail through a failure injection interface. The purpose is to test the error outputs
+  // in the cases when open(), fdopen(), fseek(), fwrite() and fclose() fail.
+  // 'function_name' specifies the name of the function where the failure is injected.
+  //  e.g. "fdopen"
+  void ExecuteWriteFailureTest(DiskIoMgr* io_mgr, const string& file_name,
+      const string& function_name, int err_no, const string& expected_error);
+
+  // Auxiliary function to ExecuteWriteFailureTest(). Handles the
+  // DiskIoMgr::AddWriteRange() call.
+  void AddWriteRange(DiskIoMgr* io_mgr, int num_of_writes, int32_t* data,
+      const string& tmp_file, int offset, RequestContext* writer,
+      const string& expected_output);
+
   ObjectPool pool_;
 
   mutex written_mutex_;
@@ -244,6 +263,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
 // validate that an error status is returned via the write callback.
 TEST_F(DiskIoMgrTest, InvalidWrite) {
   MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  int num_of_writes = 2;
   num_ranges_written_ = 0;
   string tmp_file = "/non-existent/file.txt";
   DiskIoMgr io_mgr(1, 1, 1, 1, 10);
@@ -255,9 +275,10 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   // Write to file in non-existent directory.
   WriteRange** new_range = pool_.Add(new WriteRange*);
   WriteRange::WriteDoneCallback callback =
-      bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
-          (DiskIoMgr*)nullptr, (RequestContext*)nullptr, data,
-          Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
+      bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_of_writes, new_range,
+          nullptr, nullptr, data,
+          Status(TErrorCode::DISK_IO_ERROR, "open() failed for /non-existent/file.txt. "
+              "The given path doesn't exist. errno=2"), _1);
   *new_range = pool_.Add(new WriteRange(tmp_file, rand(), 0, callback));
 
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
@@ -272,9 +293,10 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   }
 
   new_range = pool_.Add(new WriteRange*);
-  callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
-      new_range, (DiskIoMgr*)nullptr, (RequestContext*)nullptr,
-      data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
+  callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_of_writes,
+      new_range, nullptr, nullptr, data,
+      Status(TErrorCode::DISK_IO_ERROR, "fseek() failed for /tmp/disk_io_mgr_test.txt. "
+          "Invalid inputs. errno=22, offset=-1"), _1);
 
   *new_range = pool_.Add(new WriteRange(tmp_file, -1, 0, callback));
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
@@ -282,12 +304,92 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
 
   {
     unique_lock<mutex> lock(written_mutex_);
-    while (num_ranges_written_ < 2) writes_done_.Wait(lock);
+    while (num_ranges_written_ < num_of_writes) writes_done_.Wait(lock);
   }
   num_ranges_written_ = 0;
   io_mgr.UnregisterContext(writer.get());
 }
 
+// Tests the error messages when some of the disk I/O related low level function fails in
+// DiskIoMgr::Write()
+TEST_F(DiskIoMgrTest, WriteErrors) {
+  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  DiskIoMgr io_mgr(1, 1, 1, 1, 10);
+  ASSERT_OK(io_mgr.Init(&mem_tracker));
+  string file_name = "/tmp/disk_io_mgr_test.txt";
+
+  // Fail open()
+  string expected_error = Substitute("open() failed for $0. Either the path length or a "
+      "path component exceeds the maximum length. errno=36", file_name);
+  ExecuteWriteFailureTest(&io_mgr, file_name, "open", ENAMETOOLONG, expected_error);
+
+  // Fail fdopen()
+  expected_error = Substitute("fdopen() failed for $0. Not enough memory. errno=12",
+      file_name);
+  ExecuteWriteFailureTest(&io_mgr, file_name, "fdopen", ENOMEM, expected_error);
+
+  // Fail fseek()
+  expected_error = Substitute("fseek() failed for $0. Maximum file size reached. "
+      "errno=27, offset=0", file_name);
+  ExecuteWriteFailureTest(&io_mgr, file_name, "fseek", EFBIG, expected_error);
+
+  // Fail fwrite()
+  expected_error = Substitute("fwrite() failed for $0. Disk level I/O error occured. "
+      "errno=5, range_length=4", file_name);
+  ExecuteWriteFailureTest(&io_mgr, file_name, "fwrite", EIO, expected_error);
+
+  // Fail fclose()
+  expected_error = Substitute("fclose() failed for $0. Device doesn't exist. errno=6",
+      file_name);
+  ExecuteWriteFailureTest(&io_mgr, file_name, "fclose", ENXIO, expected_error);
+
+  // Fail open() with unknown error code to the ErrorConverter. This results falling back
+  // to the GetStrErrMsg() logic.
+  expected_error = Substitute("open() failed for $0. errno=49, description=Error(49): "
+                              "Protocol driver not attached", file_name);
+  ExecuteWriteFailureTest(&io_mgr, file_name, "open", EUNATCH, expected_error);
+}
+
+void DiskIoMgrTest::ExecuteWriteFailureTest(DiskIoMgr* io_mgr, const string& file_name,
+    const string& function_name, int err_no, const string& expected_error) {
+  int num_of_writes = 1;
+  num_ranges_written_ = 0;
+  unique_ptr<RequestContext> writer = io_mgr->RegisterContext(nullptr);
+  int32_t data = rand();
+  int success = CreateTempFile(file_name.c_str(), 100);
+  if (success != 0) {
+    LOG(ERROR) << "Error creating temp file " << file_name << " of size 100";
+    EXPECT_TRUE(false);
+  }
+
+  std::unique_ptr<LocalFileSystemWithFaultInjection> fs(
+      new LocalFileSystemWithFaultInjection());
+  fs->SetWriteFaultInjection(function_name, err_no);
+  // DiskIoMgr takes responsibility of fs from this point.
+  io_mgr->SetLocalFileSystem(std::move(fs));
+  AddWriteRange(io_mgr, num_of_writes, &data, file_name, 0, writer.get(),
+      expected_error);
+
+  {
+    unique_lock<mutex> lock(written_mutex_);
+    while (num_ranges_written_ < num_of_writes) writes_done_.Wait(lock);
+  }
+  num_ranges_written_ = 0;
+  io_mgr->UnregisterContext(writer.get());
+}
+
+void DiskIoMgrTest::AddWriteRange(DiskIoMgr* io_mgr, int num_of_writes, int32_t* data,
+    const string& file_name, int offset, RequestContext* writer,
+    const string& expected_output) {
+  WriteRange::WriteDoneCallback callback =
+      bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_of_writes,
+          nullptr, nullptr, nullptr, data,
+          Status(TErrorCode::DISK_IO_ERROR, expected_output), _1);
+  WriteRange* write_range = pool_.Add(new WriteRange(file_name, offset, 0, callback));
+  write_range->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
+  EXPECT_OK(io_mgr->AddWriteRange(writer, write_range));
+}
+
 // Issue a number of writes, cancel the writer context and issue more writes.
 // AddWriteRange() is expected to succeed before the cancel and fail after it.
 // The writes themselves may finish with status cancelled or ok.

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 8ff6609..0ede5b5 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -19,6 +19,7 @@
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/disk-io-mgr-internal.h"
 #include "runtime/io/handle-cache.inline.h"
+#include "runtime/io/error-converter.h"
 
 #include <boost/algorithm/string.hpp>
 
@@ -245,6 +246,7 @@ DiskIoMgr::DiskIoMgr() :
   }
   disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
   CheckSseSupport();
+  local_file_system_.reset(new LocalFileSystem());
 }
 
 DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
@@ -265,6 +267,7 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
   if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
   disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
   CheckSseSupport();
+  local_file_system_.reset(new LocalFileSystem());
 }
 
 DiskIoMgr::~DiskIoMgr() {
@@ -1084,54 +1087,32 @@ unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange(
 void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) {
   Status ret_status = Status::OK();
   FILE* file_handle = nullptr;
-  // Raw open() syscall will create file if not present when passed these flags.
-  int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
-  if (fd < 0) {
-    ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
-        Substitute("Opening '$0' for write failed with errno=$1 description=$2",
-                                     write_range->file_, errno, GetStrErrMsg())));
-  } else {
-    file_handle = fdopen(fd, "wb");
-    if (file_handle == nullptr) {
-      ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
-          Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", fd, errno,
-                                       GetStrErrMsg())));
-    }
-  }
+  Status close_status = Status::OK();
+  ret_status = local_file_system_->OpenForWrite(write_range->file(), O_RDWR | O_CREAT,
+      S_IRUSR | S_IWUSR, &file_handle);
+  if (!ret_status.ok()) goto end;
 
-  if (file_handle != nullptr) {
-    ret_status = WriteRangeHelper(file_handle, write_range);
+  ret_status = WriteRangeHelper(file_handle, write_range);
 
-    int success = fclose(file_handle);
-    if (ret_status.ok() && success != 0) {
-      ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
-          Substitute("fclose($0) failed", write_range->file_)));
-    }
-  }
+  close_status = local_file_system_->Fclose(file_handle, write_range);
+  if (ret_status.ok() && !close_status.ok()) ret_status = close_status;
 
+end:
   HandleWriteFinished(writer_context, write_range, ret_status);
 }
 
 Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
   // Seek to the correct offset and perform the write.
-  int success = fseek(file_handle, write_range->offset(), SEEK_SET);
-  if (success != 0) {
-    return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
-        Substitute("fseek($0, $1, SEEK_SET) failed with errno=$2 description=$3",
-        write_range->file_, write_range->offset(), errno, GetStrErrMsg())));
-  }
+  RETURN_IF_ERROR(local_file_system_->Fseek(file_handle, write_range->offset(), SEEK_SET,
+      write_range));
 
 #ifndef NDEBUG
   if (FLAGS_stress_scratch_write_delay_ms > 0) {
     SleepForMs(FLAGS_stress_scratch_write_delay_ms);
   }
 #endif
-  int64_t bytes_written = fwrite(write_range->data_, 1, write_range->len_, file_handle);
-  if (bytes_written < write_range->len_) {
-    return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
-        Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 description=$3",
-        write_range->len_, write_range->file_, errno, GetStrErrMsg())));
-  }
+  RETURN_IF_ERROR(local_file_system_->Fwrite(file_handle, write_range));
+
   if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != nullptr) {
     ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index e5d7eb4..cfac328 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -31,6 +31,7 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "runtime/io/handle-cache.h"
+#include "runtime/io/local-file-system.h"
 #include "runtime/io/request-ranges.h"
 #include "runtime/thread-resource-mgr.h"
 #include "util/aligned-new.h"
@@ -214,7 +215,7 @@ class DiskIoMgr : public CacheLineAligned {
 
   /// Clean up all threads and resources. This is mostly useful for testing since
   /// for impalad, this object is never destroyed.
-  ~DiskIoMgr();
+  virtual ~DiskIoMgr();
 
   /// Initialize the IoMgr. Must be called once before any of the other APIs.
   Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT;
@@ -369,6 +370,13 @@ class DiskIoMgr : public CacheLineAligned {
   /// 'bytes_to_free' is -1.
   void GcIoBuffers(int64_t bytes_to_free = -1);
 
+  // Function to change the underlying LocalFileSystem object used for disk I/O.
+  // DiskIoMgr will also take responsibility of the received LocalFileSystem pointer.
+  // It is only for testing purposes to use a fault injected version of LocalFileSystem.
+  void SetLocalFileSystem(std::unique_ptr<LocalFileSystem> fs) {
+    local_file_system_ = std::move(fs);
+  }
+
   /// The maximum number of ready buffers that can be queued in a scan range. Having two
   /// queued buffers (plus the buffer that is returned to the client) gives good
   /// performance in most scenarios:
@@ -415,6 +423,9 @@ class DiskIoMgr : public CacheLineAligned {
   /// provide a MemTracker.
   boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
 
+  // Handles the low level I/O functionality.
+  std::unique_ptr<LocalFileSystem> local_file_system_;
+
   /// Number of worker(read) threads per rotational disk. Also the max depth of queued
   /// work to the disk.
   const int num_io_threads_per_rotational_disk_;

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/error-converter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/error-converter.cc b/be/src/runtime/io/error-converter.cc
new file mode 100644
index 0000000..0ca25b7
--- /dev/null
+++ b/be/src/runtime/io/error-converter.cc
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/io/error-converter.h"
+#include "gutil/strings/substitute.h"
+#include "common/names.h"
+
+namespace impala {
+
+using std::unordered_map;
+
+unordered_map<int, string> ErrorConverter::errno_to_error_text_map_(
+    {{EACCES, "Access denied for the process' user"},
+     {EINTR,   "Internal error occured."},
+     {EINVAL,  "Invalid inputs."},
+     {EMFILE,  "Process level opened file descriptor count is reached."},
+     {ENAMETOOLONG,
+         "Either the path length or a path component exceeds the maximum length."},
+     {ENFILE,  "OS level opened file descriptor count is reached."},
+     {ENOENT,  "The given path doesn't exist."},
+     {ENOSPC,  "No space left on device."},
+     {ENOTDIR, "It is not a directory."},
+     {EOVERFLOW, "File size can't be represented."},
+     {EROFS,   "The file system is read only."},
+     {EAGAIN,  "Resource temporarily unavailable."},
+     {EBADF,   "The given file descriptor is invalid."},
+     {ENOMEM,  "Not enough memory."},
+     {EFBIG,   "Maximum file size reached."},
+     {EIO,     "Disk level I/O error occured."},
+     {ENXIO,   "Device doesn't exist."}});
+
+Status ErrorConverter::GetErrorStatusFromErrno(const string& function_name,
+    const string& file_path, int err_no, const Params& params) {
+  return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR, GetErrorText(function_name,
+      file_path, err_no, params)));
+}
+
+string ErrorConverter::GetErrorText(const string& function_name,
+    const string& file_path, int err_no, Params params) {
+  const string* error_text_body = GetErrorTextBody(err_no);
+  if (error_text_body != nullptr) {
+    params["errno"] = SimpleItoa(err_no);
+    return Substitute("$0 failed for $1. $2 $3", function_name, file_path,
+        *error_text_body, GetParamsString(params), err_no);
+  }
+  return Substitute("$0 failed for $1. errno=$2, description=$3", function_name,
+      file_path, err_no, GetStrErrMsg(err_no));
+}
+
+string ErrorConverter::GetParamsString(const Params& params) {
+  string result = "";
+  bool first = true;
+  for (const auto& item : params) {
+    if (!first) result.append(", ");
+    result.append(item.first).append("=").append(item.second);
+    first = false;
+  }
+  return result;
+}
+
+const string* ErrorConverter::GetErrorTextBody(int err_no) {
+  auto error_mapping_it = errno_to_error_text_map_.find(err_no);
+  if (error_mapping_it != errno_to_error_text_map_.end()) {
+    return &error_mapping_it->second;
+  }
+  return nullptr;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/error-converter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/error-converter.h b/be/src/runtime/io/error-converter.h
new file mode 100644
index 0000000..52f88af
--- /dev/null
+++ b/be/src/runtime/io/error-converter.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_IO_ERROR_CONVERTER_H
+#define IMPALA_RUNTIME_IO_ERROR_CONVERTER_H
+
+#include <string>
+#include <unordered_map>
+
+#include "common/status.h"
+
+namespace impala {
+
+/// This class translates 'errno' values set by disk I/O related functions to Status
+/// objects with DISK_IO_ERROR error code alongside with an error text corresponding to
+/// 'errno'. Instead of using GetStrErrMsg() this class provides richer, custom error
+/// messages so that the root cause of a disk I/O issue can be identified easier.
+/// If an internal mapping is not found for the errno then we fall back to
+/// GetStrErrMsg().
+///
+/// A sample error text is:
+/// fseek() failed for <file_path>. Not enough memory. errno=12
+class ErrorConverter {
+public:
+  typedef std::unordered_map<std::string, std::string> Params;
+
+  /// Given the name of the function that set errno and the file's path that
+  /// was being manipulated this function returns a Status object that contains
+  /// DISK_IO_ERROR error code and an error text corresponding to 'err_no'. The error
+  /// text is provided by the errno_to_error_text_map_ member. The key-value pairs in
+  /// 'params' provide a way for the user to extend the error text with additional
+  /// information in the format of "key1=value1,key2=value2".
+  static Status GetErrorStatusFromErrno(const string& function_name,
+      const std::string& file_path, int err_no, const Params& params = Params());
+
+private:
+  /// Maps errno to error text
+  static std::unordered_map<int, std::string> errno_to_error_text_map_;
+
+  /// This function is a helper for GetErrorStatusFromErrno() and returns the error text
+  /// corresponding to 'err_no'.
+  static std::string GetErrorText(const string& function_name, const std::string& file_path,
+      int err_no, Params params);
+
+  /// Looks up 'err_no' in 'errno_to_error_text_map_' and returns a pointer to the mapped
+  /// error text. Returns nullptr if 'err_no' is not found.
+  static const std::string* GetErrorTextBody(int err_no);
+
+  /// Returns a string formatted as "key1=value1, key2=value2" where key-value pairs are
+  /// taken from the 'params' map.
+  static std::string GetParamsString(const Params& params);
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/local-file-system-with-fault-injection.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-system-with-fault-injection.cc b/be/src/runtime/io/local-file-system-with-fault-injection.cc
new file mode 100644
index 0000000..9e8b93d
--- /dev/null
+++ b/be/src/runtime/io/local-file-system-with-fault-injection.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/io/local-file-system-with-fault-injection.h"
+
+namespace impala {
+namespace io {
+
+void LocalFileSystemWithFaultInjection::SetWriteFaultInjection(
+    const string& function_name, int err_no) {
+  fault_injection_to_write_.reset(WriteFaultInjectionItem(function_name, err_no));
+}
+
+int LocalFileSystemWithFaultInjection::OpenAux(const char* file, int option1,
+    int option2) {
+  if (DebugFaultInjection("open")) return -1;
+  return LocalFileSystem::OpenAux(file, option1, option2);
+}
+
+FILE* LocalFileSystemWithFaultInjection::FdopenAux(int file_desc, const char* options) {
+  if (DebugFaultInjection("fdopen")) return nullptr;
+  return LocalFileSystem::FdopenAux(file_desc, options);
+}
+
+int LocalFileSystemWithFaultInjection::FseekAux(FILE* file_handle, off_t offset,
+    int whence) {
+  if (DebugFaultInjection("fseek")) return -1;
+  return LocalFileSystem::FseekAux(file_handle, offset, whence);
+}
+
+size_t LocalFileSystemWithFaultInjection::FwriteAux(FILE* file_handle,
+    const WriteRange* write_range) {
+  if (DebugFaultInjection("fwrite")) return 0;
+  return LocalFileSystem::FwriteAux(file_handle, write_range);
+}
+
+int LocalFileSystemWithFaultInjection::FcloseAux(FILE* file_handle) {
+  if (DebugFaultInjection("fclose")) return EOF;
+  return LocalFileSystem::FcloseAux(file_handle);
+}
+
+bool LocalFileSystemWithFaultInjection::DebugFaultInjection(
+    const string& function_name) {
+  if (fault_injection_to_write_ &&
+      fault_injection_to_write_->function == function_name) {
+    errno = fault_injection_to_write_->err_no;
+    return true;
+  }
+  return false;
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/local-file-system-with-fault-injection.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-system-with-fault-injection.h b/be/src/runtime/io/local-file-system-with-fault-injection.h
new file mode 100644
index 0000000..d638bc6
--- /dev/null
+++ b/be/src/runtime/io/local-file-system-with-fault-injection.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_IO_LOCAL_FILE_SYSTEM_WITH_FAULT_INJECTION
+#define IMPALA_RUNTIME_IO_LOCAL_FILE_SYSTEM_WITH_FAULT_INJECTION
+
+#include "runtime/io/local-file-system.h"
+
+#include <boost/optional.hpp>
+#include <string>
+
+namespace impala {
+namespace io {
+
+// The purpose of this class is to override the functions in LocalFileSystem so that
+// failure could be injected into them. This is to simulate if a disk I/O function fails.
+class LocalFileSystemWithFaultInjection : public LocalFileSystem {
+public:
+  // Public interface to set the fault injection
+  void SetWriteFaultInjection(const string& function_name, int err_no);
+
+  virtual ~LocalFileSystemWithFaultInjection() {}
+
+private:
+  // Functions with the purpose of injecting faults into the respective functions in
+  // LocalFileSystem. Checks DebugFaultInjection() and if it returns true, then these
+  // functions return a value that indicates failure on the callsite. As a side effect
+  // errno is also set. If DebugFaultInjection() returns false then no fault is injected
+  // and the respective function in LocalFileSystem is invoked as it would normally.
+  virtual int OpenAux(const char* file, int option1, int option2) override;
+  virtual FILE* FdopenAux(int file_desc, const char* options) override;
+  virtual int FseekAux(FILE* file_handle, off_t offset, int whence) override;
+  virtual size_t FwriteAux(FILE* file_handle, const WriteRange* write_range) override;
+  virtual int FcloseAux(FILE* file_handle) override;
+
+  // Used for defining fault injection. This structure represents a function name meant
+  // to fail alongside with the desired error code that will be used to populate errno.
+  struct WriteFaultInjectionItem {
+    WriteFaultInjectionItem(const string& function_name, int e)
+        : function(function_name), err_no(e) {}
+    string function;
+    int err_no;
+  };
+
+  // Setting this member via SetWriteFaultInjection() will inject failure inside
+  // LocalFileSystem's functions. The user who sets this member is also responsible for
+  // clearing it via ClearWriteFaultInjection().
+  boost::optional<WriteFaultInjectionItem> fault_injection_to_write_;
+
+  // Compares 'function_name' to fault_injection_to_write_->first. If they match
+  // then sets errno to fault_injection_to_write_->second and returns true. Returns false
+  // otherwise.
+  bool DebugFaultInjection(const string& function_name);
+};
+
+}
+}
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/local-file-system.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-system.cc b/be/src/runtime/io/local-file-system.cc
new file mode 100644
index 0000000..3ad6aa4
--- /dev/null
+++ b/be/src/runtime/io/local-file-system.cc
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/io/error-converter.h"
+#include "runtime/io/local-file-system.h"
+#include "runtime/io/request-ranges.h"
+
+#include <fcntl.h>
+
+namespace impala {
+namespace io {
+
+int LocalFileSystem::OpenAux(const char* file, int option1, int option2) {
+  return open(file, option1, option2);
+}
+
+FILE* LocalFileSystem::FdopenAux(int file_desc, const char* options) {
+  return fdopen(file_desc, options);
+}
+
+Status LocalFileSystem::OpenForWrite(const char* file_name, int oflag, int mode,
+    FILE** file) {
+  DCHECK(file_name != nullptr);
+  DCHECK(file != nullptr);
+
+  int file_desc = OpenAux(file_name, oflag, mode);
+  if (file_desc < 0) {
+    return ErrorConverter::GetErrorStatusFromErrno("open()", file_name, errno);
+  }
+
+  *file = FdopenAux(file_desc, "wb");
+  if (*file == nullptr) {
+    Status fdopen_status = ErrorConverter::GetErrorStatusFromErrno("fdopen()", file_name,
+        errno);
+    if (close(file_desc) < 0) {
+      fdopen_status.MergeStatus(ErrorConverter::GetErrorStatusFromErrno("close()",
+          file_name, errno));
+    }
+    return fdopen_status;
+  }
+  return Status::OK();
+}
+
+Status LocalFileSystem::Fseek(FILE* file_handle, off_t offset, int whence,
+    const WriteRange* write_range) {
+  DCHECK(file_handle != nullptr);
+  if (FseekAux(file_handle, offset, whence) != 0) {
+    return ErrorConverter::GetErrorStatusFromErrno("fseek()", write_range->file(),
+        errno, {{"offset", SimpleItoa(offset)}});
+  }
+  return Status::OK();
+}
+
+int LocalFileSystem::FseekAux(FILE* file_handle, off_t offset, int whence){
+  return fseek(file_handle, offset, whence);
+}
+
+Status LocalFileSystem::Fwrite(FILE* file_handle, const WriteRange* write_range) {
+  DCHECK(file_handle != nullptr);
+  DCHECK(write_range != nullptr);
+  int64_t bytes_written = FwriteAux(file_handle, write_range);
+  if (bytes_written < write_range->len_) {
+    return ErrorConverter::GetErrorStatusFromErrno("fwrite()", write_range->file(),
+        errno, {{"range_length", SimpleItoa(write_range->len_)}});
+  }
+  return Status::OK();
+}
+
+size_t LocalFileSystem::FwriteAux(FILE* file_handle, const WriteRange* write_range) {
+  return fwrite(write_range->data_, 1, write_range->len_, file_handle);
+}
+
+Status LocalFileSystem::Fclose(FILE* file_handle, const WriteRange* write_range) {
+  DCHECK(file_handle != nullptr);
+  if (FcloseAux(file_handle) != 0) {
+    return ErrorConverter::GetErrorStatusFromErrno("fclose()", write_range->file(),
+        errno);
+  }
+  return Status::OK();
+}
+
+int LocalFileSystem::FcloseAux(FILE* file_handle) {
+  return fclose(file_handle);
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/local-file-system.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-system.h b/be/src/runtime/io/local-file-system.h
new file mode 100644
index 0000000..e30edce
--- /dev/null
+++ b/be/src/runtime/io/local-file-system.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_IO_LOCAL_FILE_SYSTEM_H
+#define IMPALA_RUNTIME_IO_LOCAL_FILE_SYSTEM_H
+
+#include "common/status.h"
+
+namespace impala {
+namespace io {
+
+class WriteRange;
+
+// This class introduces wrapper functions around open(), fdopen(), fseek(), fwrite() and
+// fclose() disk I/O functions. Also includes the error checking logic in these wrapper
+// functions and converts the outcome of the I/O operations to a Status object.
+class LocalFileSystem {
+public:
+  // A wrapper around open() and fdopen(). For the possible values of oflag and mode
+  // see the documentation of open(). Sets 'file' to a FILE* returned from fdopen().
+  Status OpenForWrite(const char* file_name, int oflag, int mode, FILE** file);
+
+  Status Fseek(FILE* file_handle, off_t offset, int whence,
+      const WriteRange* write_range);
+  Status Fwrite(FILE* file_handle, const WriteRange* write_range);
+  Status Fclose(FILE* file_handle, const WriteRange* write_range);
+
+  virtual ~LocalFileSystem() {}
+
+protected:
+  // Wrapper functions around open(), fdopen(), fseek(), fwrite() and fclose().
+  // Introduced so that fault injection can be implemented through inheritance.
+  virtual int OpenAux(const char* file, int option1, int option2);
+  virtual FILE* FdopenAux(int file_desc, const char* options);
+  virtual int FseekAux(FILE* file_handle, off_t offset, int whence);
+  virtual size_t FwriteAux(FILE* file_handle, const WriteRange* write_range);
+  virtual int FcloseAux(FILE* file_handle);
+};
+
+}
+}
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 222f847..609f8da 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -446,6 +446,7 @@ class WriteRange : public RequestRange {
   friend class DiskIoMgr;
   friend class RequestContext;
   friend class ScanRange;
+  friend class LocalFileSystem;
 
   /// Data to be written. RequestRange::len_ contains the length of data
   /// to be written.

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/tmp-file-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index dd8bd07..5d11c4d 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -38,11 +38,9 @@ class TmpFileMgr::File {
 
   /// Allocates 'num_bytes' bytes in this file for a new block of data.
   /// The file size is increased by a call to truncate() if necessary.
-  /// Returns Status::OK() and sets 'offset' to the file offset of the first
-  /// byte in the allocated range on success.
-  /// Returns an error status if an unexpected error occurs, e.g. the file could not
-  /// be created.
-  Status AllocateSpace(int64_t num_bytes, int64_t* offset);
+  /// Sets 'offset' to the file offset of the first byte in the allocated
+  /// range on success.
+  void AllocateSpace(int64_t num_bytes, int64_t* offset);
 
   /// Called when an IO error is encountered for this file. Logs the error and blacklists
   /// the file.

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 3091c58..a61f844 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -110,15 +110,15 @@ class TmpFileMgrTest : public ::testing::Test {
   }
 
   /// Helper to call the private TmpFileMgr::NewFile() method.
-  static Status NewFile(TmpFileMgr* mgr, TmpFileMgr::FileGroup* group,
+  static void NewFile(TmpFileMgr* mgr, TmpFileMgr::FileGroup* group,
       TmpFileMgr::DeviceId device_id, unique_ptr<TmpFileMgr::File>* new_file) {
-    return mgr->NewFile(group, device_id, new_file);
+    mgr->NewFile(group, device_id, new_file);
   }
 
   /// Helper to call the private File::AllocateSpace() method.
-  static Status FileAllocateSpace(
+  static void FileAllocateSpace(
       TmpFileMgr::File* file, int64_t num_bytes, int64_t* offset) {
-    return file->AllocateSpace(num_bytes, offset);
+    file->AllocateSpace(num_bytes, offset);
   }
 
   /// Helper to call the private FileGroup::AllocateSpace() method.
@@ -206,7 +206,7 @@ TEST_F(TmpFileMgrTest, TestFileAllocation) {
   int64_t next_offset = 0;
   for (int i = 0; i < num_write_sizes; ++i) {
     int64_t offset;
-    ASSERT_OK(FileAllocateSpace(file, write_sizes[i], &offset));
+    FileAllocateSpace(file, write_sizes[i], &offset);
     EXPECT_EQ(next_offset, offset);
     next_offset = offset + write_sizes[i];
   }
@@ -309,12 +309,12 @@ TEST_F(TmpFileMgrTest, TestReportError) {
 
   // Attempts to expand bad file should succeed.
   int64_t offset;
-  ASSERT_OK(FileAllocateSpace(bad_file, 128, &offset));
+  FileAllocateSpace(bad_file, 128, &offset);
   // The good device should still be usable.
-  ASSERT_OK(FileAllocateSpace(good_file, 128, &offset));
+  FileAllocateSpace(good_file, 128, &offset);
   // Attempts to allocate new files on bad device should succeed.
   unique_ptr<TmpFileMgr::File> bad_file2;
-  ASSERT_OK(NewFile(&tmp_file_mgr, &file_group, bad_device, &bad_file2));
+  NewFile(&tmp_file_mgr, &file_group, bad_device, &bad_file2);
   ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));
   file_group.Close();
   CheckMetrics(&tmp_file_mgr);
@@ -336,14 +336,14 @@ TEST_F(TmpFileMgrTest, TestAllocateNonWritable) {
   vector<TmpFileMgr::File*> allocated_files;
   ASSERT_OK(CreateFiles(&file_group, &allocated_files));
   int64_t offset;
-  ASSERT_OK(FileAllocateSpace(allocated_files[0], 1, &offset));
+  FileAllocateSpace(allocated_files[0], 1, &offset);
 
   // Make scratch non-writable and test allocation at different stages:
   // new file creation, files with no allocated blocks. files with allocated space.
   // No errors should be encountered during allocation since allocation is purely logical.
   chmod(scratch_subdirs[0].c_str(), 0);
-  ASSERT_OK(FileAllocateSpace(allocated_files[0], 1, &offset));
-  ASSERT_OK(FileAllocateSpace(allocated_files[1], 1, &offset));
+  FileAllocateSpace(allocated_files[0], 1, &offset);
+  FileAllocateSpace(allocated_files[1], 1, &offset);
 
   chmod(scratch_subdirs[0].c_str(), S_IRWXU);
   ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 3807670..5adfcab 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -150,7 +150,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
   return Status::OK();
 }
 
-Status TmpFileMgr::NewFile(
+void TmpFileMgr::NewFile(
     FileGroup* file_group, DeviceId device_id, unique_ptr<File>* new_file) {
   DCHECK(initialized_);
   DCHECK_GE(device_id, 0);
@@ -164,7 +164,6 @@ Status TmpFileMgr::NewFile(
   new_file_path /= file_name.str();
 
   new_file->reset(new File(file_group, device_id, new_file_path.string()));
-  return Status::OK();
 }
 
 string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const {
@@ -197,11 +196,10 @@ TmpFileMgr::File::File(FileGroup* file_group, DeviceId device_id, const string&
   DCHECK(file_group != nullptr);
 }
 
-Status TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) {
+void TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) {
   DCHECK_GT(num_bytes, 0);
   *offset = bytes_allocated_;
   bytes_allocated_ += num_bytes;
-  return Status::OK();
 }
 
 int TmpFileMgr::File::AssignDiskQueue() const {
@@ -258,17 +256,10 @@ Status TmpFileMgr::FileGroup::CreateFiles() {
   // Initialize the tmp files and the initial file to use.
   for (int i = 0; i < tmp_devices.size(); ++i) {
     TmpFileMgr::DeviceId device_id = tmp_devices[i];
-    // It is possible for a device to be blacklisted after it was returned by
-    // ActiveTmpDevices(), handle this gracefully by skipping devices if NewFile()
-    // fails.
     unique_ptr<TmpFileMgr::File> tmp_file;
-    Status status = tmp_file_mgr_->NewFile(this, device_id, &tmp_file);
-    if (status.ok()) {
-      tmp_files_.emplace_back(std::move(tmp_file));
-      ++files_allocated;
-    } else {
-      scratch_errors_.push_back(std::move(status));
-    }
+    tmp_file_mgr_->NewFile(this, device_id, &tmp_file);
+    tmp_files_.emplace_back(std::move(tmp_file));
+    ++files_allocated;
   }
   DCHECK_EQ(tmp_files_.size(), files_allocated);
   if (tmp_files_.size() == 0) {
@@ -321,18 +312,10 @@ Status TmpFileMgr::FileGroup::AllocateSpace(
     *tmp_file = tmp_files_[next_allocation_index_].get();
     next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size();
     if ((*tmp_file)->is_blacklisted()) continue;
-    Status status = (*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset);
-    if (status.ok()) {
-      scratch_space_bytes_used_counter_->Add(scratch_range_bytes);
-      current_bytes_allocated_ += num_bytes;
-      return Status::OK();
-    }
-    // Log error and try other files if there was a problem. Problematic files will be
-    // blacklisted so we will not repeatedly log the same error.
-    LOG(WARNING) << "Error while allocating range in scratch file '"
-                 << (*tmp_file)->path() << "': " << status.msg().msg()
-                 << ". Will try another scratch file.";
-    scratch_errors_.push_back(status);
+    (*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset);
+    scratch_space_bytes_used_counter_->Add(scratch_range_bytes);
+    current_bytes_allocated_ += num_bytes;
+    return Status::OK();
   }
   Status err_status(TErrorCode::SCRATCH_ALLOCATION_FAILED,
       join(tmp_file_mgr_->tmp_dirs_, ","), GetBackendString());

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 95072ae..b6fce43 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -405,8 +405,8 @@ class TmpFileMgr {
   /// directory on the specified device id. The caller owns the returned handle and is
   /// responsible for deleting it. The file is not created - creation is deferred until
   /// the file is written.
-  Status NewFile(FileGroup* file_group, DeviceId device_id,
-      std::unique_ptr<File>* new_file) WARN_UNUSED_RESULT;
+  void NewFile(FileGroup* file_group, DeviceId device_id,
+    std::unique_ptr<File>* new_file);
 
   bool initialized_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/util/error-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/error-util.cc b/be/src/util/error-util.cc
index 69a0355..94d2ccd 100644
--- a/be/src/util/error-util.cc
+++ b/be/src/util/error-util.cc
@@ -29,10 +29,14 @@ namespace impala {
 string GetStrErrMsg() {
   // Save errno. "<<" could reset it.
   int e = errno;
-  if (e == 0) return "";
+  return GetStrErrMsg(e);
+}
+
+string GetStrErrMsg(int err_no) {
+  if (err_no == 0) return "";
   stringstream ss;
   char buf[1024];
-  ss << "Error(" << e << "): " << strerror_r(e, buf, 1024);
+  ss << "Error(" << err_no << "): " << strerror_r(err_no, buf, 1024);
   return ss.str();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/595421e7/be/src/util/error-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/error-util.h b/be/src/util/error-util.h
index c366ab2..44dcb26 100644
--- a/be/src/util/error-util.h
+++ b/be/src/util/error-util.h
@@ -36,6 +36,10 @@ namespace impala {
 /// Returns empty string if errno is 0.
 std::string GetStrErrMsg();
 
+// This version of the function receives errno as a parameter instead of reading it
+// itself.
+std::string GetStrErrMsg(int err_no);
+
 /// Returns an error message warning that the given table names are missing relevant
 /// table/and or column statistics.
 std::string GetTablesMissingStatsWarning(