You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/12 22:10:02 UTC

[27/50] [abbrv] incubator-impala git commit: IMPALA-3337: fix "Cancelled" warnings when LIMIT clause is specified

IMPALA-3337: fix "Cancelled" warnings when LIMIT clause is specified

The cancelled status is propagated in scanner threads to cause them to
shut down once the limit has been satisified, but depending on the code
path and when abort_on_error=false, this internal status would sometimes
incorrectly end up in the error log. Fix this by factoring out the
abort_on_error handling code so that it's handled more consistently
across scanners. Parquet, RC, and Avro all suffered from this bug.

Testing: exhastive

Change-Id: I4a91a22608e346ca21a23ea66c855eae54bbced6
Reviewed-on: http://gerrit.cloudera.org:8080/2964
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a0d42496
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a0d42496
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a0d42496

Branch: refs/heads/master
Commit: a0d424965230bd43020b493af24d6e209346823b
Parents: 3b7d5b7
Author: Dan Hecht <dh...@cloudera.com>
Authored: Fri Apr 29 08:52:51 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 14:17:57 2016 -0700

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc |  3 +--
 be/src/exec/hdfs-parquet-scanner.cc  | 28 +++++++---------------------
 be/src/exec/hdfs-rcfile-scanner.cc   |  8 ++------
 be/src/exec/hdfs-scanner.cc          | 14 ++++++++++++++
 be/src/exec/hdfs-scanner.h           |  6 ++++++
 be/src/exec/hdfs-text-scanner.cc     |  6 +-----
 tests/query_test/test_scanners.py    | 16 ++++++++++++++--
 7 files changed, 45 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a0d42496/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index f88717f..dadab51 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -114,8 +114,7 @@ Status BaseSequenceScanner::ProcessSplit() {
     header_ = state_->obj_pool()->Add(AllocateFileHeader());
     Status status = ReadFileHeader();
     if (!status.ok()) {
-      if (state_->abort_on_error()) return status;
-      state_->LogError(status.msg());
+      RETURN_IF_ERROR(LogOrReturnError(status.msg()));
       // We need to complete the ranges for this file.
       CloseFileRanges(stream_->filename());
       return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a0d42496/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index b32270c..52fe3fa 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -82,18 +82,6 @@ static_assert(
     !(ROWS_PER_FILTER_SELECTIVITY_CHECK & (ROWS_PER_FILTER_SELECTIVITY_CHECK - 1)),
     "ROWS_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
 
-// TODO: refactor error reporting across all scanners to be more consistent (e.g. make
-// sure file name is always included, errors are reported exactly once)
-// TODO: rename these macros so its easier to tell them apart
-
-#define LOG_OR_RETURN_ON_ERROR(error_msg, runtime_state)                \
-  do {                                                                  \
-    if (runtime_state->abort_on_error()) {                              \
-      return Status(error_msg.msg());                                   \
-    }                                                                   \
-    runtime_state->LogError(error_msg);                                 \
-  } while (false)                                                       \
-
 // FILE_CHECKs are conditions that we expect to be true but could fail due to a malformed
 // input file. They differentiate these cases from DCHECKs, which indicate conditions that
 // are true unless there's a bug in Impala. We would ideally always return a bad Status
@@ -1182,7 +1170,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
     } else if (num_values_read_ > metadata_->num_values) {
       ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
           metadata_->num_values, num_values_read_, node_.element->name, filename());
-      LOG_OR_RETURN_ON_ERROR(msg, parent_->state_);
+      RETURN_IF_ERROR(parent_->LogOrReturnError(msg));
       return Status::OK();
     }
 
@@ -1195,7 +1183,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
       // TODO for 2.3: node_.element->name isn't necessarily useful
       ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
           metadata_->num_values, num_values_read_, node_.element->name, filename());
-      LOG_OR_RETURN_ON_ERROR(msg, parent_->state_);
+      RETURN_IF_ERROR(parent_->LogOrReturnError(msg));
       return Status::OK();
     }
 
@@ -1701,11 +1689,9 @@ Status HdfsParquetScanner::ProcessSplit() {
     // Store UDF error in thread local storage or make UDF return status so it can merge
     // with parse_status_.
     RETURN_IF_ERROR(state_->GetQueryStatus());
-    if (UNLIKELY(parse_status_.IsMemLimitExceeded())) return parse_status_;
     if (UNLIKELY(!parse_status_.ok())) {
-      LOG_OR_RETURN_ON_ERROR(parse_status_.msg(), state_);
+      RETURN_IF_ERROR(LogOrReturnError(parse_status_.msg()));
     }
-
     if (scan_node_->ReachedLimit()) return Status::OK();
     if (context_->cancelled()) return Status::OK();
     if (!filters_pass) return Status::OK();
@@ -2897,13 +2883,13 @@ Status HdfsParquetScanner::ValidateColumn(
     if (!schema_element.__isset.precision) {
       ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename(),
           schema_element.name);
-      LOG_OR_RETURN_ON_ERROR(msg, state_);
+      RETURN_IF_ERROR(LogOrReturnError(msg));
     } else {
       if (schema_element.precision != slot_desc->type().precision) {
         // TODO: we could allow a mismatch and do a conversion at this step.
         ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename(), schema_element.name,
             schema_element.precision, slot_desc->type().precision);
-        LOG_OR_RETURN_ON_ERROR(msg, state_);
+        RETURN_IF_ERROR(LogOrReturnError(msg));
       }
     }
 
@@ -2912,13 +2898,13 @@ Status HdfsParquetScanner::ValidateColumn(
       // might only serve to reject otherwise perfectly readable files.
       ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE, filename(),
           schema_element.name);
-      LOG_OR_RETURN_ON_ERROR(msg, state_);
+      RETURN_IF_ERROR(LogOrReturnError(msg));
     }
   } else if (schema_element.__isset.scale || schema_element.__isset.precision ||
       is_converted_type_decimal) {
     ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename(),
         schema_element.name, slot_desc->type().DebugString());
-    LOG_OR_RETURN_ON_ERROR(msg, state_);
+    RETURN_IF_ERROR(LogOrReturnError(msg));
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a0d42496/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index aee4529..bd37900 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -522,12 +522,8 @@ Status HdfsRCFileScanner::ProcessRange() {
 
         if (error_in_row) {
           error_in_row = false;
-          if (state_->LogHasSpace()) {
-            stringstream ss;
-            ss << "file: " << stream_->filename();
-            state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()), 2);
-          }
-          if (state_->abort_on_error()) return Status(state_->ErrorLog());
+          ErrorMsg msg(TErrorCode::GENERAL, Substitute("file: $0", stream_->filename()));
+          RETURN_IF_ERROR(LogOrReturnError(msg));
         }
 
         current_row->SetTuple(scan_node_->tuple_idx(), tuple);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a0d42496/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 028b066..af6ddf7 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -648,6 +648,20 @@ void HdfsScanner::ReportColumnParseError(const SlotDescriptor* desc,
   }
 }
 
+Status HdfsScanner::LogOrReturnError(const ErrorMsg& message) const {
+  DCHECK_NE(message.error(), TErrorCode::OK);
+  // If either abort_on_error=true or the error necessitates execution stops
+  // immediately, return an error status.
+  if (state_->abort_on_error() ||
+      message.error() == TErrorCode::MEM_LIMIT_EXCEEDED ||
+      message.error() == TErrorCode::CANCELLED) {
+    return Status(message);
+  }
+  // Otherwise, add the error to the error log and continue.
+  state_->LogError(message);
+  return Status::OK();
+}
+
 string HdfsScanner::PrintPath(const SchemaPath& path, int subpath_idx) const {
   SchemaPath::const_iterator subpath_end =
       subpath_idx == -1 ? path.end() : path.begin() + subpath_idx + 1;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a0d42496/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 2620da1..9206e77 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -405,6 +405,12 @@ class HdfsScanner {
     return reinterpret_cast<TupleRow*>(mem + batch_->row_byte_size());
   }
 
+  /// Given an error message, determine whether execution should be aborted and, if so,
+  /// return the corresponding error status. Otherwise, log the error and return
+  /// Status::OK(). Execution is aborted if the ABORT_ON_ERROR query option is set to
+  /// true or the error is not recoverable and should be handled upstream.
+  Status LogOrReturnError(const ErrorMsg& message) const;
+
   // Convenience function for calling the PrintPath() function in
   // debug-util. 'subpath_idx' can be specified in order to truncate the output to end on
   // the i-th element of 'path' (inclusive).

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a0d42496/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 958df8b..dee34dc 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -269,11 +269,7 @@ Status HdfsTextScanner::FinishScanRange() {
         stringstream ss;
         ss << "Read failed while trying to finish scan range: " << stream_->filename()
            << ":" << stream_->file_offset() << endl << status.GetDetail();
-        if (state_->abort_on_error()) {
-          return Status(ss.str());
-        } else {
-          state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
-        }
+        RETURN_IF_ERROR(LogOrReturnError(ErrorMsg(TErrorCode::GENERAL, ss.str())));
       } else if (!partial_tuple_empty_ || !boundary_column_.Empty() ||
           !boundary_row_.Empty() ||
           (delimited_text_parser_->HasUnfinishedTuple() &&

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a0d42496/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 15181fc..6936765 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -57,15 +57,27 @@ class TestScannersAllTableFormatsWithLimit(ImpalaTestSuite):
     super(TestScannersAllTableFormatsWithLimit, cls).add_test_dimensions()
 
   def test_limit(self, vector):
+    vector.get_value('exec_option')['abort_on_error'] = 1
+    self._test_limit(vector)
+    # IMPALA-3337: when continuing on error, the error log should not show errors
+    # (e.g. "Cancelled").
+    vector.get_value('exec_option')['abort_on_error'] = 0
+    self._test_limit(vector)
+
+  def _test_limit(self, vector):
     # Use a small batch size so changing the limit affects the timing of cancellation
     vector.get_value('exec_option')['batch_size'] = 100
     iterations = 50
     query_template = "select * from alltypes limit %s"
     for i in range(1, iterations):
       # Vary the limit to vary the timing of cancellation
-      query = query_template % ((iterations * 100) % 1000 + 1)
-      self.execute_query(query, vector.get_value('exec_option'),
+      limit = (iterations * 100) % 1000 + 1
+      query = query_template % limit
+      result = self.execute_query(query, vector.get_value('exec_option'),
           table_format=vector.get_value('table_format'))
+      assert len(result.data) == limit
+      # IMPALA-3337: The error log should be empty.
+      assert not result.log
 
 # Test case to verify the scanners work properly when the table metadata (specifically the
 # number of columns in the table) does not match the number of columns in the data file.