You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/09/13 15:26:44 UTC

[1/6] incubator-impala git commit: Bump Kudu version to 3f49724

Repository: incubator-impala
Updated Branches:
  refs/heads/master 84233b297 -> 6596bebe0


Bump Kudu version to 3f49724

Change-Id: I21aff562e2bca90436a8d0206ffca44b712bc1f1
Reviewed-on: http://gerrit.cloudera.org:8080/8040
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4a5757fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4a5757fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4a5757fa

Branch: refs/heads/master
Commit: 4a5757fae063f6935d05941783904a8b04697fa0
Parents: 84233b2
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue Sep 12 09:33:22 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Sep 12 21:06:42 2017 +0000

----------------------------------------------------------------------
 bin/impala-config.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4a5757fa/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index c3a25df..74d888c 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -72,7 +72,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=462-a06b20680a
+export IMPALA_TOOLCHAIN_BUILD_ID=465-9a2affdeab
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
@@ -120,7 +120,7 @@ if [[ $OSTYPE == "darwin"* ]]; then
 fi
 
 # Kudu version in the toolchain; provides libkudu_client.so and minicluster binaries.
-export IMPALA_KUDU_VERSION=a71ecfd
+export IMPALA_KUDU_VERSION=3f49724
 
 # Kudu version used to identify Java client jar from maven
 export KUDU_JAVA_VERSION=1.6.0-cdh5.14.0-SNAPSHOT


[4/6] incubator-impala git commit: IMPALA-5597: Try casting targetExpr when building runtime filter plan

Posted by ta...@apache.org.
IMPALA-5597: Try casting targetExpr when building runtime filter plan

This patch fixes a bug that fails a precondition check when generating
runtime filter plans. The lhs and rhs or join predicate might have
different types when the eq predicate function accepts wildcard-typed
parameters. In this case in existing code the types of source and target
expr will be found mismatch and an exception will be thrown when
generating runtime filters. This patch tries to cast target expr to be
of the same type as source expr. A testcase is added to joins.test

Change-Id: I0d66673e280ce13cd3a514236c0c2ecbc50475a6
Reviewed-on: http://gerrit.cloudera.org:8080/7949
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/caa382c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/caa382c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/caa382c2

Branch: refs/heads/master
Commit: caa382c284fcae6b795c48d53cace6956fc41371
Parents: 34d63e9
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Fri Sep 1 12:49:48 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 13 07:39:03 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/AnalyticExpr.java    |  3 +--
 .../java/org/apache/impala/analysis/Expr.java   |  3 +--
 .../impala/analysis/FunctionCallExpr.java       |  3 +--
 .../impala/planner/RuntimeFilterGenerator.java  | 23 +++++++++++---------
 .../PlannerTest/runtime-filter-propagation.test | 22 +++++++++++++++++++
 .../queries/QueryTest/runtime_row_filters.test  | 15 +++++++++++++
 6 files changed, 53 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/caa382c2/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
index 15d5b4d..28de6da 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
@@ -830,8 +830,7 @@ public class AnalyticExpr extends Expr {
   }
 
   @Override
-  protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer)
-      throws AnalysisException {
+  protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer) {
     Expr e = super.substituteImpl(smap, analyzer);
     if (!(e instanceof AnalyticExpr)) return e;
     // Re-sync state after possible child substitution.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/caa382c2/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 5dc0438..2b31b61 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -862,8 +862,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
    * Exprs that have non-child exprs which should be affected by substitutions must
    * override this method and apply the substitution to such exprs as well.
    */
-  protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer)
-      throws AnalysisException {
+  protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer) {
     if (isImplicitCast()) return getChild(0).substituteImpl(smap, analyzer);
     if (smap != null) {
       Expr substExpr = smap.get(this);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/caa382c2/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 5de6f6b..2bc2860 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -618,8 +618,7 @@ public class FunctionCallExpr extends Expr {
   public Expr clone() { return new FunctionCallExpr(this); }
 
   @Override
-  protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer)
-      throws AnalysisException {
+  protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer) {
     Expr e = super.substituteImpl(smap, analyzer);
     if (!(e instanceof FunctionCallExpr)) return e;
     FunctionCallExpr fn = (FunctionCallExpr) e;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/caa382c2/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 67c3532..646eb48 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -36,6 +36,8 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
 import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.thrift.TRuntimeFilterDesc;
@@ -581,20 +583,21 @@ public final class RuntimeFilterGenerator {
       }
       Preconditions.checkState(exprSlots.size() == smap.size());
       try {
-        targetExpr = targetExpr.substitute(smap, analyzer, true);
+        targetExpr = targetExpr.substitute(smap, analyzer, false);
+      } catch (Exception e) {
+        return null;
+      }
+    }
+    Type srcType = filter.getSrcExpr().getType();
+    // Types of targetExpr and srcExpr must be exactly the same since runtime filters are
+    // based on hashing.
+    if (!targetExpr.getType().equals(srcType)) {
+      try {
+        targetExpr = targetExpr.castTo(srcType);
       } catch (Exception e) {
-        // An exception is thrown if we cannot generate a target expr from this
-        // scan node that has the same type as the lhs expr of the join predicate
-        // from which the runtime filter was generated. We skip that scan node and will
-        // try to assign the filter to a different scan node.
-        //
-        // TODO: Investigate if we can generate a type-compatible source/target expr
-        // pair from that scan node instead of skipping it.
         return null;
       }
     }
-    Preconditions.checkState(
-        targetExpr.getType().matchesType(filter.getSrcExpr().getType()));
     return targetExpr;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/caa382c2/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
index 602505b..c1675af 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-propagation.test
@@ -1434,3 +1434,25 @@ PLAN-ROOT SINK
    partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> b.id
 ====
+# IMPALA-5597: Runtime filter should be generated and assigned successfully when the
+# source expr and target expr have different decimal types.
+select *
+from tpch_parquet.lineitem
+left join tpch_parquet.part on if(l_orderkey % 2 = 0, NULL, l_partkey) = p_partkey
+where l_orderkey = 965 and l_extendedprice * l_tax = p_retailprice;
+---- Plan
+PLAN-ROOT SINK
+|
+02:HASH JOIN [RIGHT OUTER JOIN]
+|  hash predicates: p_partkey = if(l_orderkey % 2 = 0, NULL, l_partkey)
+|  other predicates: p_retailprice = l_extendedprice * l_tax
+|  runtime filters: RF000 <- if(l_orderkey % 2 = 0, NULL, l_partkey), RF001 <- l_extendedprice * l_tax
+|
+|--00:SCAN HDFS [tpch_parquet.lineitem]
+|     partitions=1/1 files=3 size=193.71MB
+|     predicates: l_orderkey = 965
+|
+01:SCAN HDFS [tpch_parquet.part]
+   partitions=1/1 files=1 size=6.23MB
+   runtime filters: RF000 -> p_partkey, RF001 -> p_retailprice
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/caa382c2/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
index 01e1055..7cb5884 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
@@ -319,3 +319,18 @@ from tpch_parquet.lineitem l1 join tpch_parquet.lineitem l2
 ---- RESULTS
 0
 ====
+
+
+---- QUERY
+# Test case 15: filter with a predicate that has different decimal precision between
+# lhs expr and rhs expr.
+# IMPALA-5597: Runtime filter should be generated and assigned successfully when the
+# source expr and target expr have different decimal types.
+
+select count(*)
+from tpch_parquet.lineitem
+left join tpch_parquet.part on if(l_orderkey % 2 = 0, NULL, l_partkey) = p_partkey
+where l_orderkey = 965 and l_extendedprice * l_tax = p_retailprice;
+---- RESULTS
+1
+====


[5/6] incubator-impala git commit: IMPALA-5890: Abort queries if scanner hits IO errors

Posted by ta...@apache.org.
IMPALA-5890: Abort queries if scanner hits IO errors

Prior to this fix, an error in ScannerContext::Stream::GetNextBuffer()
could leave the stream in an inconsistent state:

- The DiskIoMgr hits EOF unexpected, cancels the scan range and enqueues
a buffer with eosr set.
- The ScannerContext::Stream tries to read more bytes, but since it has
hit eosr, it tries to read beyond the end of the scan range using
DiskIoMgr::Read().
- The previous read error resulted in a new file handle being opened.
The now truncated, smaller file causes the seek to fail.
- Then during error handling, the BaseSequenceScanner calls SkipToSync()
and trips over the NULL pointer in in the IO buffer.

In my reproduction this only happens with the file handle cache enabled,
which causes Impala to see two different sized handles: the one from the
cache when the query starts, and the one after reopening the file.

To fix this, we change the I/O manager to always return DISK_IO_ERROR
for errors and we abort a query if we receive such an error in the
scanner.

This change also fixes GetBytesInternal() to maintain the invariant that
the output buffer points to the boundary buffer whenever the latter
contains some data.

I tested this by running the repro from the JIRA and impalad did not
crash but aborted the queries. I also ran the repro with
abort_on_error=1, and with the file handle cache disabled.

Text files are not affected by this problem, since the
text scanner doesn't try to recover from errors during ProcessRange()
but wraps it in RETURN_IF_ERROR instead. With this change queries abort
with the same error.

Parquet files are also not affected since they have the metadata at the
end. Truncated files immediately fail with this error:
WARNINGS: File 'hdfs://localhost:20500/test-warehouse/tpch.partsupp_parquet/foo.0.parq'
has an invalid version number: <UTF8 Garbage>

Change-Id: I44dc95184c241fbcdbdbebad54339530680d3509
Reviewed-on: http://gerrit.cloudera.org:8080/8011
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/322e2dc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/322e2dc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/322e2dc8

Branch: refs/heads/master
Commit: 322e2dc80259cfa712fc6d0d224d2c2c16a6708d
Parents: caa382c
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Sep 5 14:57:57 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 13 09:20:53 2017 +0000

----------------------------------------------------------------------
 be/src/common/status.h                   | 10 +++++++
 be/src/exec/base-sequence-scanner.cc     | 21 ++++++-------
 be/src/exec/scanner-context.cc           | 36 ++++++++++++++++++++--
 be/src/exec/scanner-context.h            |  7 +++--
 be/src/runtime/disk-io-mgr-scan-range.cc | 43 +++++++++++----------------
 be/src/runtime/disk-io-mgr-test.cc       |  6 ++--
 be/src/runtime/disk-io-mgr.cc            | 28 +++++++++--------
 be/src/runtime/runtime-state.cc          |  4 ++-
 common/thrift/generate_error_codes.py    |  4 +++
 9 files changed, 104 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index cf7481b..0f057e7 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -201,11 +201,21 @@ class NODISCARD Status {
         && msg_->error() == TErrorCode::MEM_LIMIT_EXCEEDED;
   }
 
+  bool IsInternalError() const {
+    return msg_ != NULL
+        && msg_->error() == TErrorCode::INTERNAL_ERROR;
+  }
+
   bool IsRecoverableError() const {
     return msg_ != NULL
         && msg_->error() == TErrorCode::RECOVERABLE_ERROR;
   }
 
+  bool IsDiskIoError() const {
+    return msg_ != NULL
+        && msg_->error() == TErrorCode::DISK_IO_ERROR;
+  }
+
   /// Returns the error message associated with a non-successful status.
   const ErrorMsg& msg() const {
     DCHECK(msg_ != NULL);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index a522a1f..da67bb5 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -174,18 +174,19 @@ Status BaseSequenceScanner::GetNextInternal(RowBatch* row_batch) {
 
   Status status = ProcessRange(row_batch);
   if (!status.ok()) {
-    if (status.IsCancelled() || status.IsMemLimitExceeded()) return status;
-
     // Log error from file format parsing.
-    state_->LogError(ErrorMsg(TErrorCode::SEQUENCE_SCANNER_PARSE_ERROR,
-        stream_->filename(), stream_->file_offset(),
-        (stream_->eof() ? "(EOF)" : "")));
-
-    // Make sure errors specified in the status are logged as well
-    state_->LogError(status.msg());
+    // TODO(IMPALA-5922): Include the file and offset in errors inside the scanners.
+    if (!status.IsCancelled() &&
+        !status.IsMemLimitExceeded() &&
+        !status.IsInternalError() &&
+        !status.IsDiskIoError()) {
+      state_->LogError(ErrorMsg(TErrorCode::SEQUENCE_SCANNER_PARSE_ERROR,
+          stream_->filename(), stream_->file_offset(),
+          (stream_->eof() ? "(EOF)" : "")));
+    }
 
-    // If abort on error then return, otherwise try to recover.
-    if (state_->abort_on_error()) return status;
+    // This checks for abort_on_error.
+    RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
 
     // Recover by skipping to the next sync.
     parse_status_ = Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index b1577a1..16a09e4 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -151,7 +151,9 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
 
   if (!eosr) {
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
-    RETURN_IF_ERROR(scan_range_->GetNext(&io_buffer_));
+    Status status = scan_range_->GetNext(&io_buffer_);
+    DCHECK(!status.ok() || io_buffer_ != NULL);
+    RETURN_IF_ERROR(status);
   } else {
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
 
@@ -184,6 +186,15 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
   }
 
   DCHECK(io_buffer_ != NULL);
+  if (UNLIKELY(io_buffer_ == NULL)) {
+    // This has bitten us before, so we defend against NULL in release builds here. It
+    // indicates an error in the IoMgr, which did not return a valid buffer.
+    // TODO(IMPALA-5914): Remove this check once we're confident we're not hitting it.
+    return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: "
+        "Failed to receive buffer from scan range for file $0 at offset $1",
+        filename(), offset));
+  }
+
   parent_->scan_node_->num_owned_io_buffers_.Add(1);
   io_buffer_pos_ = reinterpret_cast<uint8_t*>(io_buffer_->buffer());
   io_buffer_bytes_left_ = io_buffer_->len();
@@ -207,7 +218,7 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
   }
 
   if (boundary_buffer_bytes_left_ > 0) {
-    DCHECK_EQ(output_buffer_pos_, &boundary_buffer_pos_);
+    DCHECK(ValidateBufferPointers());
     DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
     *out_buffer = boundary_buffer_pos_;
     // Don't return more bytes past eosr
@@ -225,6 +236,9 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
     // We're at the end of the boundary buffer and the current IO buffer. Get a new IO
     // buffer and set the current buffer to it.
     RETURN_IF_ERROR(GetNextBuffer());
+    // Check that we're not pointing to the IO buffer if there are bytes left in the
+    // boundary buffer.
+    DCHECK_EQ(boundary_buffer_bytes_left_, 0);
     output_buffer_pos_ = &io_buffer_pos_;
     output_buffer_bytes_left_ = &io_buffer_bytes_left_;
   }
@@ -238,6 +252,7 @@ Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_
     total_bytes_returned_ += *len;
   }
   DCHECK_GE(bytes_left(), 0);
+  DCHECK(ValidateBufferPointers());
   return Status::OK();
 }
 
@@ -254,6 +269,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
     }
   }
 
+  DCHECK(ValidateBufferPointers());
+
   while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
     // We must copy the remainder of 'io_buffer_' to 'boundary_buffer_' before advancing
     // to handle the case when the read straddles a block boundary. Preallocate
@@ -262,6 +279,11 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
       RETURN_IF_ERROR(boundary_buffer_->GrowBuffer(requested_len));
       RETURN_IF_ERROR(boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_));
       boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
+
+      // Make state consistent in case we return early with an error below.
+      io_buffer_bytes_left_ = 0;
+      output_buffer_pos_ = &boundary_buffer_pos_;
+      output_buffer_bytes_left_ = &boundary_buffer_bytes_left_;
     }
 
     int64_t remaining_requested_len = requested_len - boundary_buffer_->len();
@@ -306,6 +328,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len,
     }
   }
 
+
+  DCHECK(ValidateBufferPointers());
   return Status::OK();
 }
 
@@ -314,6 +338,14 @@ bool ScannerContext::cancelled() const {
   return static_cast<HdfsScanNode*>(scan_node_)->done();
 }
 
+bool ScannerContext::Stream::ValidateBufferPointers() const {
+  // If there are bytes left in the boundary buffer, the output buffer pointers must point
+  // to it.
+  return boundary_buffer_bytes_left_ == 0 ||
+      (output_buffer_pos_ == &boundary_buffer_pos_ &&
+      output_buffer_bytes_left_ == &boundary_buffer_bytes_left_);
+}
+
 Status ScannerContext::Stream::ReportIncompleteRead(int64_t length, int64_t bytes_read) {
   return Status(TErrorCode::SCANNER_INCOMPLETE_READ, length, bytes_read,
       filename(), file_offset());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 470ec01..bd5623c 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -87,7 +87,7 @@ class ScannerContext {
     ///    If the requested buffer straddles io buffers, a copy is done here.
     ///  - *out_len is the number of bytes returned.
     ///  - *status is set if there is an error.
-    /// Returns true if the call was success (i.e. status->ok())
+    /// Returns true if the call was successful (i.e. status->ok())
     /// This should only be called from the scanner thread.
     /// Note that this will return bytes past the end of the scan range until the end of
     /// the file.
@@ -264,6 +264,9 @@ class ScannerContext {
     /// resources are also freed.
     void ReleaseCompletedResources(RowBatch* batch, bool done);
 
+    /// Validates that the output buffer pointers point to the correct buffer.
+    bool ValidateBufferPointers() const;
+
     /// Error-reporting functions.
     Status ReportIncompleteRead(int64_t length, int64_t bytes_read);
     Status ReportInvalidRead(int64_t length);
@@ -301,7 +304,7 @@ class ScannerContext {
   /// The stream is created in the runtime state's object pool
   Stream* AddStream(DiskIoMgr::ScanRange* range);
 
-  /// Returns false it scan_node_ is multi-threaded and has been cancelled.
+  /// Returns false if scan_node_ is multi-threaded and has been cancelled.
   /// Always returns false if the scan_node_ is not multi-threaded.
   bool cancelled() const;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index df74fcc..8ed5138 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -291,14 +291,16 @@ Status DiskIoMgr::ScanRange::Open(bool use_file_handle_cache) {
     exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
         mtime(), reader_, true);
     if (exclusive_hdfs_fh_ == nullptr) {
-      return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_));
+      return Status(TErrorCode::DISK_IO_ERROR,
+          GetHdfsErrorMsg("Failed to open HDFS file ", file_));
     }
 
     if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) != 0) {
       // Destroy the file handle and remove it from the cache.
       io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true);
       exclusive_hdfs_fh_ = nullptr;
-      return Status(Substitute("Error seeking to $0 in file: $1 $2", offset_, file_,
+      return Status(TErrorCode::DISK_IO_ERROR,
+          Substitute("Error seeking to $0 in file: $1 $2", offset_, file_,
           GetHdfsErrorMsg("")));
     }
   } else {
@@ -306,19 +308,14 @@ Status DiskIoMgr::ScanRange::Open(bool use_file_handle_cache) {
 
     local_file_ = fopen(file(), "r");
     if (local_file_ == nullptr) {
-      string error_msg = GetStrErrMsg();
-      stringstream ss;
-      ss << "Could not open file: " << file_ << ": " << error_msg;
-      return Status(ss.str());
+      return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not open file: $0: $1",
+            file_, GetStrErrMsg()));
     }
     if (fseek(local_file_, offset_, SEEK_SET) == -1) {
       fclose(local_file_);
       local_file_ = nullptr;
-      string error_msg = GetStrErrMsg();
-      stringstream ss;
-      ss << "Could not seek to " << offset_ << " for file: " << file_
-         << ": " << error_msg;
-      return Status(ss.str());
+      return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not seek to $0 "
+          "for file: $1: $2", offset_, file_, GetStrErrMsg()));
     }
   }
   if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
@@ -425,7 +422,8 @@ Status DiskIoMgr::ScanRange::Read(
       borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
           mtime(), reader_, false);
       if (borrowed_hdfs_fh == nullptr) {
-        return Status(GetHdfsErrorMsg("Failed to open HDFS file ", file_));
+        return Status(TErrorCode::DISK_IO_ERROR,
+            GetHdfsErrorMsg("Failed to open HDFS file ", file_));
       }
       hdfs_file = borrowed_hdfs_fh->file();
     }
@@ -450,7 +448,8 @@ Status DiskIoMgr::ScanRange::Read(
           current_bytes_read = hdfsPread(fs_, hdfs_file, position_in_file,
               buffer + *bytes_read, chunk_size);
           if (current_bytes_read == -1) {
-            status = Status(GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
+            status = Status(TErrorCode::DISK_IO_ERROR,
+                GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
           }
         } else {
           // If the file handle is borrowed, it may not be at the appropriate
@@ -458,11 +457,8 @@ Status DiskIoMgr::ScanRange::Read(
           bool seek_failed = false;
           if (borrowed_hdfs_fh != nullptr) {
             if (hdfsSeek(fs_, hdfs_file, position_in_file) != 0) {
-              string error_msg = GetHdfsErrorMsg("");
-              stringstream ss;
-              ss << "Error seeking to " << position_in_file << " in file: "
-                 << file_ << " " << error_msg;
-              status = Status(ss.str());
+              status = Status(TErrorCode::DISK_IO_ERROR, Substitute("Error seeking to $0 "
+                  " in file: $1: $2", position_in_file, file_, GetHdfsErrorMsg("")));
               seek_failed = true;
             }
           }
@@ -470,7 +466,8 @@ Status DiskIoMgr::ScanRange::Read(
             current_bytes_read = hdfsRead(fs_, hdfs_file, buffer + *bytes_read,
                 chunk_size);
             if (current_bytes_read == -1) {
-              status = Status(GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
+              status = Status(TErrorCode::DISK_IO_ERROR,
+                  GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
             }
           }
         }
@@ -513,11 +510,8 @@ Status DiskIoMgr::ScanRange::Read(
     DCHECK_LE(*bytes_read, bytes_to_read);
     if (*bytes_read < bytes_to_read) {
       if (ferror(local_file_) != 0) {
-        string error_msg = GetStrErrMsg();
-        stringstream ss;
-        ss << "Error reading from " << file_ << " at byte offset: "
-           << (offset_ + bytes_read_) << ": " << error_msg;
-        return Status(ss.str());
+        return Status(TErrorCode::DISK_IO_ERROR, Substitute("Error reading from $0"
+            "at byte offset: $1: $2", file_, offset_ + bytes_read_, GetStrErrMsg()));
       } else {
         // On Linux, we should only get partial reads from block devices on error or eof.
         DCHECK(feof(local_file_) != 0);
@@ -569,7 +563,6 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
   // TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish
   // between errors and partially cached blocks here.
   if (bytes_read < len()) {
-    stringstream ss;
     VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected "
       << len() << " bytes, but read " << bytes_read << ". Switching to disk read path.";
     // Close the scan range. 'read_succeeded' is still false, so the caller will fall back

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 05c99e7..7c60efa 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -61,7 +61,7 @@ class DiskIoMgrTest : public testing::Test {
     if (expected_status.code() == TErrorCode::CANCELLED) {
       EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail();
     } else {
-      EXPECT_TRUE(status.code() == expected_status.code());
+      EXPECT_EQ(status.code(), expected_status.code());
     }
     if (status.ok()) {
       DiskIoMgr::ScanRange* scan_range = pool_->Add(new DiskIoMgr::ScanRange());
@@ -261,7 +261,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   DiskIoMgr::WriteRange::WriteDoneCallback callback =
       bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
           (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data,
-          Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
+          Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
 
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
@@ -278,7 +278,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   new_range = pool_->Add(new DiskIoMgr::WriteRange*);
   callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
       new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
-      data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
+      data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
 
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 0fe16d2..e77d9ca 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -541,13 +541,16 @@ int64_t DiskIoMgr::GetReadThroughput() {
 Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
   int disk_id = range->disk_id_;
   if (disk_id < 0 || disk_id >= disk_queues_.size()) {
-    return Status(Substitute("Invalid scan range.  Bad disk id: $0", disk_id));
+    return Status(TErrorCode::DISK_IO_ERROR,
+        Substitute("Invalid scan range.  Bad disk id: $0", disk_id));
   }
   if (range->offset_ < 0) {
-    return Status(Substitute("Invalid scan range. Negative offset $0", range->offset_));
+    return Status(TErrorCode::DISK_IO_ERROR,
+        Substitute("Invalid scan range. Negative offset $0", range->offset_));
   }
   if (range->len_ < 0) {
-    return Status(Substitute("Invalid scan range. Negative length $0", range->len_));
+    return Status(TErrorCode::DISK_IO_ERROR,
+        Substitute("Invalid scan range. Negative length $0", range->len_));
   }
   return Status::OK();
 }
@@ -665,9 +668,9 @@ Status DiskIoMgr::Read(DiskIoRequestContext* reader,
 
   if (range->len() > max_buffer_size_
       && range->external_buffer_tag_ != ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
-    return Status(Substitute("Internal error: cannot perform sync read of '$0' bytes "
-                   "that is larger than the max read buffer size '$1'.",
-            range->len(), max_buffer_size_));
+    return Status(TErrorCode::DISK_IO_ERROR, Substitute("Internal error: cannot "
+        "perform sync read of '$0' bytes that is larger than the max read buffer size "
+        "'$1'.", range->len(), max_buffer_size_));
   }
 
   vector<DiskIoMgr::ScanRange*> ranges;
@@ -1164,13 +1167,13 @@ void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_ra
   // Raw open() syscall will create file if not present when passed these flags.
   int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
   if (fd < 0) {
-    ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+    ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
         Substitute("Opening '$0' for write failed with errno=$1 description=$2",
                                      write_range->file_, errno, GetStrErrMsg())));
   } else {
     file_handle = fdopen(fd, "wb");
     if (file_handle == nullptr) {
-      ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+      ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
           Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", fd, errno,
                                        GetStrErrMsg())));
     }
@@ -1181,7 +1184,7 @@ void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_ra
 
     int success = fclose(file_handle);
     if (ret_status.ok() && success != 0) {
-      ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+      ret_status = Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
           Substitute("fclose($0) failed", write_range->file_)));
     }
   }
@@ -1193,7 +1196,7 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
   // Seek to the correct offset and perform the write.
   int success = fseek(file_handle, write_range->offset(), SEEK_SET);
   if (success != 0) {
-    return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+    return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
         Substitute("fseek($0, $1, SEEK_SET) failed with errno=$2 description=$3",
         write_range->file_, write_range->offset(), errno, GetStrErrMsg())));
   }
@@ -1205,7 +1208,7 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
 #endif
   int64_t bytes_written = fwrite(write_range->data_, 1, write_range->len_, file_handle);
   if (bytes_written < write_range->len_) {
-    return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+    return Status(ErrorMsg(TErrorCode::DISK_IO_ERROR,
         Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 description=$3",
         write_range->len_, write_range->file_, errno, GetStrErrMsg())));
   }
@@ -1290,7 +1293,8 @@ Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fnam
   *fid = file_handle_cache_.GetFileHandle(fs, fname, mtime, true,
       &cache_hit);
   if (*fid == nullptr) {
-    return Status(GetHdfsErrorMsg("Failed to open HDFS file ", fname->data()));
+    return Status(TErrorCode::DISK_IO_ERROR,
+        GetHdfsErrorMsg("Failed to open HDFS file ", fname->data()));
   }
   DCHECK(!cache_hit);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index fd1c061..8f48439 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -179,8 +179,10 @@ Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
   // If either abort_on_error=true or the error necessitates execution stops
   // immediately, return an error status.
   if (abort_on_error() ||
+      message.error() == TErrorCode::CANCELLED ||
       message.error() == TErrorCode::MEM_LIMIT_EXCEEDED ||
-      message.error() == TErrorCode::CANCELLED) {
+      message.error() == TErrorCode::INTERNAL_ERROR ||
+      message.error() == TErrorCode::DISK_IO_ERROR) {
     return Status(message);
   }
   // Otherwise, add the error to the error log and continue.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/322e2dc8/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 3c37bef..ad07963 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -333,6 +333,10 @@ error_codes = (
      "Queued reason: $2"),
 
   ("THREAD_CREATION_FAILED", 109, "Failed to create thread $0 in category $1: $2"),
+
+  ("DISK_IO_ERROR", 110, "Disk I/O error: $0"),
+
+
 )
 
 import sys


[6/6] incubator-impala git commit: IMPALA-5905: add script for all-build-options job

Posted by ta...@apache.org.
IMPALA-5905: add script for all-build-options job

This checks in a modified version of the job script for
https://jenkins.impala.io/view/Experimental/job/all-build-options
which adds UBSAN and TSAN.

The script is also modified to not reference any jenkins environment
variables, e.g. WORKSPACE, since the Jenkins job script intermingled
references to those with the script logic.

Change-Id: I6e78f05c41e3ccd59af599b00e453e7f88b2bb34
Reviewed-on: http://gerrit.cloudera.org:8080/8043
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6596bebe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6596bebe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6596bebe

Branch: refs/heads/master
Commit: 6596bebe009e363a0095b2a29b17a73b0b891856
Parents: 322e2dc
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Sep 12 14:13:50 2017 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Wed Sep 13 15:23:06 2017 +0000

----------------------------------------------------------------------
 bin/jenkins/build-all-flag-combinations.sh | 59 +++++++++++++++++++++++++
 1 file changed, 59 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6596bebe/bin/jenkins/build-all-flag-combinations.sh
----------------------------------------------------------------------
diff --git a/bin/jenkins/build-all-flag-combinations.sh b/bin/jenkins/build-all-flag-combinations.sh
new file mode 100755
index 0000000..cfd6fde
--- /dev/null
+++ b/bin/jenkins/build-all-flag-combinations.sh
@@ -0,0 +1,59 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Build Impala with the most common build configurations and check that the build
+# succeeds.a Intended for use as a precommit test to make sure nothing got broken.
+#
+# Assumes that ninja and ccache are installed.
+
+set -euo pipefail
+trap 'echo Error in $0 at line $LINENO: $(cd "'$PWD'" && awk "NR == $LINENO" $0)' ERR
+
+OPTIONS=("-skiptests" "-noclean")
+FAILED_OPTIONS=""
+for BUILD_TYPE in "" -asan -release -ubsan -tsan
+do
+  OPTIONS[2]=$BUILD_TYPE
+  for NINJA in "" -ninja
+  do
+    OPTIONS[3]=$NINJA
+    for BUILD_SHARED_LIBS in "" -so
+    do
+      OPTIONS[4]=$BUILD_SHARED_LIBS
+      if ! ./bin/clean.sh
+      then
+        echo "Clean failed"
+        exit 1
+      fi
+      echo "Building with OPTIONS: ${OPTIONS[@]}"
+      if ! time -p ./buildall.sh ${OPTIONS[@]}
+      then
+        echo "Build failed with OPTIONS: ${OPTIONS[@]}"
+        FAILED_OPTIONS="${FAILED_OPTIONS}:${OPTIONS[@]}"
+      fi
+      ccache -s
+    done
+  done
+done
+
+if [[ "$FAILED_OPTIONS" != "" ]]
+  echo "Builds with the following options failed:"
+  echo "$FAILED_OPTIONS"
+  exit 1
+fi


[2/6] incubator-impala git commit: IMPALA-4987: Fix flaky test test_row_availability.py

Posted by ta...@apache.org.
IMPALA-4987: Fix flaky test test_row_availability.py

This patch keeps test_row_availbility from randomly failing. In this test
the time interval between the 'Rows available' timeline event and the
previous event in the runtime profile is measured in order to make sure
that the rows become available after a specific amount of time. This
measurement is not correct since the previous event is that the
coordinator finished sending the query fragments to the backends, which
means the execution on some backends might have already started. This
patch tracks another event "Ready to start" as the beginning of the time
interval instead. The coordinator begins to send the query fragments to
the backends after this event so the time check should always pass.

Change-Id: I96142f1868a26426cbc34aa9a0e0a56979df66c3
Reviewed-on: http://gerrit.cloudera.org:8080/8036
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d7e41a37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d7e41a37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d7e41a37

Branch: refs/heads/master
Commit: d7e41a3776991645577412583b2116fa0d3f192a
Parents: 4a5757f
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Mon Sep 11 18:41:30 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 13 00:55:05 2017 +0000

----------------------------------------------------------------------
 tests/query_test/test_rows_availability.py | 51 +++++++++++++++----------
 1 file changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d7e41a37/tests/query_test/test_rows_availability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_rows_availability.py b/tests/query_test/test_rows_availability.py
index fa1ecb8..d8db865 100644
--- a/tests/query_test/test_rows_availability.py
+++ b/tests/query_test/test_rows_availability.py
@@ -71,31 +71,40 @@ class TestRowsAvailability(ImpalaTestSuite):
     self.impalad_test_service.wait_for_query_state(self.client, handle,
         self.client.QUERY_STATES['FINISHED'], timeout=20)
 
-    # Parse the query profile for the 'Rows available' timeline event.
-    rows_avail_line = self.__get_rows_available_event(handle)
-    # Match the parenthesized delta duration between the 'Rows available' event
-    # and the previous event.
-    matches = re.search(r'\(.*\)', rows_avail_line)
-    if matches is None:
-      assert False, "Failed to find parenthesized delta time in %s" % rows_avail_line
-    # Strip out parenthesis.
-    rows_avail_ms_str = matches.group(0)[1:-1]
-    rows_avail_ms = self.__parse_duration_ms(rows_avail_ms_str)
-    assert rows_avail_ms >= self.ROWS_AVAIL_LOWER_BOUND_MS,\
+    profile = self.client.get_runtime_profile(handle)
+    start_time_ms = None
+    rows_avail_time_ms = None
+    for line in profile.split("\n"):
+      if "Ready to start on" in line:
+        start_time_ms = self.__parse_time_ms(self.__find_time(line))
+      elif "Rows available:" in line:
+        rows_avail_time_ms = self.__parse_time_ms(self.__find_time(line))
+
+    if start_time_ms is None:
+      assert False, "Failed to find the 'Ready to start' timeline event in the " \
+                    "query profile:\n%s" % profile
+    if rows_avail_time_ms is None:
+      assert False, "Failed to find the 'Rows available' timeline event in the " \
+                    "query profile:\n%s" % profile
+    time_diff = rows_avail_time_ms - start_time_ms
+    assert time_diff >= self.ROWS_AVAIL_LOWER_BOUND_MS,\
         "The 'Rows available' timeline event was marked prematurely %sms after the "\
-        "previous timeline event.\nExpected the event to be marked no earlier than "\
-        "%sms after the previous event.\nQuery: %s"\
-        % (rows_avail_ms, self.ROWS_AVAIL_LOWER_BOUND_MS, query)
+        "'Ready to start' event.\nExpected the event to be marked no earlier than "\
+        "%sms after the 'Ready to start' event.\nQuery: %s"\
+        % (time_diff, self.ROWS_AVAIL_LOWER_BOUND_MS, query)
     self.close_query(handle)
 
-  def __get_rows_available_event(self, query_handle):
-    profile = self.client.get_runtime_profile(query_handle)
-    for line in profile.split("\n"):
-      if "Rows available:" in line: return line
-    assert False, "Failed to find the 'Rows available' timeline event in the "\
-        "query profile:\n%s" % profile
+  @staticmethod
+  def __find_time(line):
+    """Find event time point in a line from the runtime profile timeline."""
+    # Given line "- Rows available: 3s311ms (2s300ms)", this function returns "3s311ms"
+    match = re.search(r': (.*) \(', line)
+    if match is None:
+      assert False, "Failed to find time in runtime profile"
+    return match.group(1)
 
-  def __parse_duration_ms(self, duration):
+  @staticmethod
+  def __parse_time_ms(duration):
     """Parses a duration string of the form 1h2h3m4s5.6ms into milliseconds."""
     matches = re.findall(r'([0-9]+h)?([0-9]+m)?([0-9]+s)?([0-9]+(\.[0-9]+)?ms)?',
                          duration)


[3/6] incubator-impala git commit: IMPALA-3516: Avoid writing to /tmp in testing

Posted by ta...@apache.org.
IMPALA-3516: Avoid writing to /tmp in testing

Currently some parts of the tests write to /tmp:
1. PlannerTest result files are written to /tmp/PlannerTest
2. FE tests load libfesupport, which writes logs to /tmp
3. Updated results in EE tests (run-tests.py --update_results) is
   written to /tmp
This patch changes them into writing to $IMPALA_HOME/logs. Specifically:
1. PlannerTest result files are written to
   $IMPALA_FE_TEST_LOGS_DIR/PlannerTest
2. libfesupport logs are written to $IMPALA_FE_TEST_LOGS_DIR
3. Updated EE test results are written to $IMPALA_EE_TEST_LOGS_DIR

Change-Id: I9e503eb7d333c1b89dc8aea87cf30504838c44f9
Reviewed-on: http://gerrit.cloudera.org:8080/8047
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/34d63e9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/34d63e9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/34d63e9d

Branch: refs/heads/master
Commit: 34d63e9dea9765c48ae2040bd25a718d39fc6314
Parents: d7e41a3
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Mon Sep 11 15:34:49 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 13 07:36:04 2017 +0000

----------------------------------------------------------------------
 be/src/service/fe-support.cc                                 | 2 ++
 .../test/java/org/apache/impala/planner/PlannerTestBase.java | 8 +++++++-
 tests/common/impala_test_suite.py                            | 4 +++-
 3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/34d63e9d/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 2c38b30..54b78a5 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -65,6 +65,8 @@ JNIEXPORT void JNICALL
 Java_org_apache_impala_service_FeSupport_NativeFeTestInit(
     JNIEnv* env, jclass caller_class) {
   DCHECK(ExecEnv::GetInstance() == NULL) << "This should only be called once from the FE";
+  char* env_logs_dir_str = std::getenv("IMPALA_FE_TEST_LOGS_DIR");
+  if (env_logs_dir_str != nullptr) FLAGS_log_dir = env_logs_dir_str;
   char* name = const_cast<char*>("FeSupport");
   // Init the JVM to load the classes in JniUtil that are needed for returning
   // exceptions to the FE.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/34d63e9d/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 6cd4e7a..6ab4b8b 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -87,7 +87,7 @@ public class PlannerTestBase extends FrontendTestBase {
   private final static Logger LOG = LoggerFactory.getLogger(PlannerTest.class);
   private final static boolean GENERATE_OUTPUT_FILE = true;
   private final String testDir_ = "functional-planner/queries/PlannerTest";
-  private final String outDir_ = "/tmp/PlannerTest/";
+  private static String outDir_;
   private static KuduClient kuduClient_;
 
   // Map from plan ID (TPlanNodeId) to the plan node with that ID.
@@ -109,6 +109,12 @@ public class PlannerTestBase extends FrontendTestBase {
     if (RuntimeEnv.INSTANCE.isKuduSupported()) {
       kuduClient_ = new KuduClient.KuduClientBuilder("127.0.0.1:7051").build();
     }
+    String logDir = System.getenv("IMPALA_FE_TEST_LOGS_DIR");
+    if (logDir != null) {
+      outDir_ = logDir + "/PlannerTest";
+    } else {
+      outDir_ = "/tmp/PlannerTest";
+    }
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/34d63e9d/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 1b7d043..b0857e9 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -97,6 +97,7 @@ WORKLOAD_DIR = os.environ['IMPALA_WORKLOAD_DIR']
 HDFS_CONF = HdfsConfig(pytest.config.option.minicluster_xml_conf)
 TARGET_FILESYSTEM = os.getenv("TARGET_FILESYSTEM") or "hdfs"
 IMPALA_HOME = os.getenv("IMPALA_HOME")
+EE_TEST_LOGS_DIR = os.getenv("IMPALA_EE_TEST_LOGS_DIR")
 # Match any SET statement. Assume that query options' names
 # only contain alphabets and underscores.
 SET_PATTERN = re.compile(r'\s*set\s*([a-zA-Z_]+)=*', re.I)
@@ -440,7 +441,8 @@ class ImpalaTestSuite(BaseTestSuite):
             vector.get_value('table_format').file_format,
             pytest.config.option.update_results, result_section='DML_RESULTS')
     if pytest.config.option.update_results:
-      output_file = os.path.join('/tmp', test_file_name.replace('/','_') + ".test")
+      output_file = os.path.join(EE_TEST_LOGS_DIR,
+                                 test_file_name.replace('/','_') + ".test")
       write_test_file(output_file, sections, encoding=encoding)
 
   def execute_test_case_setup(self, setup_section, table_format):