You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/05/16 02:53:16 UTC

[1/7] impala git commit: IMPALA-7033/IMPALA-7030: Backout suspected change leading to crash

Repository: impala
Updated Branches:
  refs/heads/master 3661100fa -> 75a10a3dd


IMPALA-7033/IMPALA-7030: Backout suspected change leading to crash

Revert "IMPALA-5384, part 2: Simplify Coordinator locking and clarify state"

This reverts commit 6ca87e46736a1e591ed7d7d5fee05b4b4d2fbb50.

Change-Id: Idc63006e6e04130b2873a6a9730e434c563327c5
Reviewed-on: http://gerrit.cloudera.org:8080/10412
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4fab4288
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4fab4288
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4fab4288

Branch: refs/heads/master
Commit: 4fab42883184c247b951dba4237f4303c502d410
Parents: 3661100
Author: Dan Hecht <dh...@cloudera.com>
Authored: Tue May 15 11:36:15 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue May 15 22:24:26 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.h |   8 -
 be/src/runtime/coordinator.cc              | 424 +++++++++++++-----------
 be/src/runtime/coordinator.h               | 330 +++++++++---------
 be/src/service/client-request-state.cc     |   2 +-
 be/src/service/impala-server.h             |   5 +
 be/src/util/counting-barrier.h             |  21 +-
 6 files changed, 395 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index e7af2e2..d2f122c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -21,17 +21,9 @@
 #include <vector>
 #include <unordered_set>
 
-#include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics/max.hpp>
-#include <boost/accumulators/statistics/mean.hpp>
-#include <boost/accumulators/statistics/median.hpp>
-#include <boost/accumulators/statistics/min.hpp>
-#include <boost/accumulators/statistics/stats.hpp>
-#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/thread/mutex.hpp>
 
 #include "runtime/coordinator.h"
-#include "scheduling/query-schedule.h"
 #include "util/progress-updater.h"
 #include "util/stopwatch.h"
 #include "util/runtime-profile.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 91f2e29..d87971e 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -25,7 +25,6 @@
 #include <boost/algorithm/string.hpp>
 #include <gutil/strings/substitute.h>
 
-#include "common/hdfs.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
 #include "gen-cpp/ImpalaInternalService.h"
@@ -40,7 +39,6 @@
 #include "runtime/query-state.h"
 #include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
-#include "scheduling/query-schedule.h"
 #include "util/bloom-filter.h"
 #include "util/counting-barrier.h"
 #include "util/hdfs-bulk-ops.h"
@@ -53,13 +51,16 @@
 
 using namespace apache::thrift;
 using namespace rapidjson;
+using namespace strings;
 using boost::algorithm::iequals;
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
 using boost::algorithm::token_compress_on;
 using boost::algorithm::split;
 using boost::filesystem::path;
+using std::unique_ptr;
 
+DECLARE_int32(be_port);
 DECLARE_string(hostname);
 
 using namespace impala;
@@ -75,9 +76,11 @@ Coordinator::Coordinator(
     query_events_(events) {}
 
 Coordinator::~Coordinator() {
-  // Must have entered a terminal exec state guaranteeing resources were released.
-  DCHECK_NE(exec_state_, ExecState::EXECUTING);
-  // Release the coordinator's reference to the query control structures.
+  DCHECK(released_exec_resources_)
+      << "ReleaseExecResources() must be called before Coordinator is destroyed";
+  DCHECK(released_admission_control_resources_)
+      << "ReleaseAdmissionControlResources() must be called before Coordinator is "
+      << "destroyed";
   if (query_state_ != nullptr) {
     ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
   }
@@ -106,6 +109,12 @@ Status Coordinator::Exec() {
   bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0;
   if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
 
+  // to keep things simple, make async Cancel() calls wait until plan fragment
+  // execution has been initiated, otherwise we might try to cancel fragment
+  // execution at Impala daemons where it hasn't even started
+  // TODO: revisit this, it may not be true anymore
+  lock_guard<mutex> l(lock_);
+
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx());
   query_state_->AcquireExecResourceRefcount(); // Decremented in ReleaseExecResources().
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
@@ -129,9 +138,9 @@ Status Coordinator::Exec() {
     InitFilterRoutingTable();
   }
 
-  // At this point, all static setup is done and all structures are initialized. Only
-  // runtime-related state changes past this point (examples: fragment instance
-  // profiles, etc.)
+  // At this point, all static setup is done and all structures are initialized.
+  // Only runtime-related state changes past this point (examples:
+  // num_remaining_backends_, fragment instance profiles, etc.)
 
   StartBackendExec();
   RETURN_IF_ERROR(FinishBackendStartup());
@@ -146,7 +155,7 @@ Status Coordinator::Exec() {
       // which means we failed Prepare
       Status prepare_status = query_state_->WaitForPrepare();
       DCHECK(!prepare_status.ok());
-      return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
+      return prepare_status;
     }
 
     // When GetFInstanceState() returns the coordinator instance, the Prepare phase
@@ -160,6 +169,7 @@ Status Coordinator::Exec() {
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
   }
+
   return Status::OK();
 }
 
@@ -198,8 +208,6 @@ void Coordinator::InitFragmentStats() {
 void Coordinator::InitBackendStates() {
   int num_backends = schedule_.per_backend_exec_params().size();
   DCHECK_GT(num_backends, 0);
-
-  lock_guard<SpinLock> l(backend_states_init_lock_);
   backend_states_.resize(num_backends);
 
   RuntimeProfile::Counter* num_backends_counter =
@@ -207,13 +215,19 @@ void Coordinator::InitBackendStates() {
   num_backends_counter->Set(num_backends);
 
   // create BackendStates
+  bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr;
+  const TNetworkAddress& coord_address = ExecEnv::GetInstance()->backend_address();
   int backend_idx = 0;
   for (const auto& entry: schedule_.per_backend_exec_params()) {
+    if (has_coord_fragment && coord_address == entry.first) {
+      coord_backend_idx_ = backend_idx;
+    }
     BackendState* backend_state = obj_pool()->Add(
         new BackendState(query_id(), backend_idx, filter_mode_));
     backend_state->Init(entry.second, fragment_stats_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
   }
+  DCHECK(!has_coord_fragment || coord_backend_idx_ != -1);
 }
 
 void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
@@ -327,8 +341,8 @@ void Coordinator::InitFilterRoutingTable() {
 
 void Coordinator::StartBackendExec() {
   int num_backends = backend_states_.size();
-  exec_rpcs_complete_barrier_.reset(new CountingBarrier(num_backends));
-  backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
+  exec_complete_barrier_.reset(new CountingBarrier(num_backends));
+  num_remaining_backends_ = num_backends;
 
   DebugOptions debug_options(schedule_.query_options());
 
@@ -340,11 +354,11 @@ void Coordinator::StartBackendExec() {
     ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
         [backend_state, this, &debug_options]() {
           backend_state->Exec(query_ctx(), debug_options, filter_routing_table_,
-              exec_rpcs_complete_barrier_.get());
+            exec_complete_barrier_.get());
         });
   }
-  exec_rpcs_complete_barrier_->Wait();
 
+  exec_complete_barrier_->Wait();
   VLOG_QUERY << "started execution on " << num_backends << " backends for query_id="
              << PrintId(query_id());
   query_events_->MarkEvent(
@@ -353,24 +367,26 @@ void Coordinator::StartBackendExec() {
 }
 
 Status Coordinator::FinishBackendStartup() {
+  Status status = Status::OK();
   const TMetricDef& def =
       MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS);
   // Capture up to 30 minutes of start-up times, in ms, with 4 s.f. accuracy.
   HistogramMetric latencies(def, 30 * 60 * 1000, 4);
-  Status status = Status::OK();
-  string error_hostname;
   for (BackendState* backend_state: backend_states_) {
     // preserve the first non-OK, if there is one
     Status backend_status = backend_state->GetStatus();
-    if (!backend_status.ok() && status.ok()) {
-      status = backend_status;
-      error_hostname = backend_state->impalad_address().hostname;
-    }
+    if (!backend_status.ok() && status.ok()) status = backend_status;
     latencies.Update(backend_state->rpc_latency());
   }
+
   query_profile_->AddInfoString(
       "Backend startup latencies", latencies.ToHumanReadable());
-  return UpdateExecState(status, nullptr, error_hostname);
+
+  if (!status.ok()) {
+    query_status_ = status;
+    CancelInternal();
+  }
+  return status;
 }
 
 string Coordinator::FilterDebugString() {
@@ -430,115 +446,40 @@ string Coordinator::FilterDebugString() {
   return Substitute("\n$0", table_printer.ToString());
 }
 
-const char* Coordinator::ExecStateToString(const ExecState state) {
-  static const unordered_map<ExecState, const char *> exec_state_to_str{
-    {ExecState::EXECUTING,        "EXECUTING"},
-    {ExecState::RETURNED_RESULTS, "RETURNED_RESULTS"},
-    {ExecState::CANCELLED,        "CANCELLED"},
-    {ExecState::ERROR,            "ERROR"}};
-  return exec_state_to_str.at(state);
+Status Coordinator::GetStatus() {
+  lock_guard<mutex> l(lock_);
+  return query_status_;
 }
 
-Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
-  DCHECK(state == ExecState::RETURNED_RESULTS || state == ExecState::CANCELLED);
-  Status ret_status;
-  {
-    lock_guard<SpinLock> l(exec_state_lock_);
-    // May have already entered a terminal state, in which case nothing to do.
-    if (exec_state_ != ExecState::EXECUTING) return exec_status_;
-    DCHECK(exec_status_.ok()) << exec_status_;
-    exec_state_ = state;
-    if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
-    ret_status = exec_status_;
-  }
-  VLOG_QUERY << Substitute("ExecState: query id=$0 execution $1", PrintId(query_id()),
-      state == ExecState::CANCELLED ? "cancelled" : "completed");
-  HandleExecStateTransition(ExecState::EXECUTING, state);
-  return ret_status;
-}
-
-Status Coordinator::UpdateExecState(const Status& status,
-    const TUniqueId* failed_finst, const string& instance_hostname) {
-  Status ret_status;
-  ExecState old_state, new_state;
+Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
+    bool is_fragment_failure, const TUniqueId& instance_id) {
   {
-    lock_guard<SpinLock> l(exec_state_lock_);
-    old_state = exec_state_;
-    if (old_state == ExecState::EXECUTING) {
-      DCHECK(exec_status_.ok()) << exec_status_;
-      if (!status.ok()) {
-        // Error while executing - go to ERROR state.
-        exec_status_ = status;
-        exec_state_ = ExecState::ERROR;
-      }
-    } else if (old_state == ExecState::RETURNED_RESULTS) {
-      // Already returned all results. Leave exec status as ok, stay in this state.
-      DCHECK(exec_status_.ok()) << exec_status_;
-    } else if (old_state == ExecState::CANCELLED) {
-      // Client requested cancellation already, stay in this state.  Ignores errors
-      // after requested cancellations.
-      DCHECK(exec_status_.IsCancelled()) << exec_status_;
-    } else {
-      // Already in the ERROR state, stay in this state but update status to be the
-      // first non-cancelled status.
-      DCHECK_EQ(old_state, ExecState::ERROR);
-      DCHECK(!exec_status_.ok());
-      if (!status.ok() && !status.IsCancelled() && exec_status_.IsCancelled()) {
-        exec_status_ = status;
-      }
-    }
-    new_state = exec_state_;
-    ret_status = exec_status_;
-  }
-  // Log interesting status: a non-cancelled error or a cancellation if was executing.
-  if (!status.ok() && (!status.IsCancelled() || old_state == ExecState::EXECUTING)) {
-    VLOG_QUERY << Substitute(
-        "ExecState: query id=$0 finstance=$1 on host=$2 ($3 -> $4) status=$5",
-        PrintId(query_id()), failed_finst != nullptr ? PrintId(*failed_finst) : "N/A",
-        instance_hostname, ExecStateToString(old_state), ExecStateToString(new_state),
-        status.GetDetail());
-  }
-  // After dropping the lock, apply the state transition (if any) side-effects.
-  HandleExecStateTransition(old_state, new_state);
-  return ret_status;
-}
-
-bool Coordinator::ReturnedAllResults() {
-  lock_guard<SpinLock> l(exec_state_lock_);
-  return exec_state_ == ExecState::RETURNED_RESULTS;
-}
-
-void Coordinator::HandleExecStateTransition(
-    const ExecState old_state, const ExecState new_state) {
-  static const unordered_map<ExecState, const char *> exec_state_to_event{
-    {ExecState::EXECUTING,        "Executing"},
-    {ExecState::RETURNED_RESULTS, "Last row fetched"},
-    {ExecState::CANCELLED,        "Execution cancelled"},
-    {ExecState::ERROR,            "Execution error"}};
-  if (old_state == new_state) return;
-  // Once we enter a terminal state, we stay there, guaranteeing this code runs only once.
-  DCHECK_EQ(old_state, ExecState::EXECUTING);
-  // Should never transition to the initial state.
-  DCHECK_NE(new_state, ExecState::EXECUTING);
-
-  query_events_->MarkEvent(exec_state_to_event.at(new_state));
-  // TODO: IMPALA-7011 is this needed? This will also happen on the "backend" side of
-  // cancel rpc. And in the case of EOS, sink already knows this.
-  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
-  // This thread won the race to transitioning into a terminal state - terminate
-  // execution and release resources.
-  ReleaseExecResources();
-  if (new_state == ExecState::RETURNED_RESULTS) {
-    // TODO: IMPALA-6984: cancel all backends in this case too.
-    WaitForBackends();
+    lock_guard<mutex> l(lock_);
+
+    // The query is done and we are just waiting for backends to clean up.
+    // Ignore their cancelled updates.
+    if (returned_all_results_ && status.IsCancelled()) return query_status_;
+
+    // nothing to update
+    if (status.ok()) return query_status_;
+
+    // don't override an error status; also, cancellation has already started
+    if (!query_status_.ok()) return query_status_;
+
+    query_status_ = status;
+    CancelInternal();
+  }
+
+  if (is_fragment_failure) {
+    // Log the id of the fragment that first failed so we can track it down more easily.
+    VLOG_QUERY << "query_id=" << PrintId(query_id())
+               << " failed because fragment_instance_id=" << PrintId(instance_id)
+               << " on host=" << backend_hostname << " failed.";
   } else {
-    CancelBackends();
+    VLOG_QUERY << "query_id=" << PrintId(query_id()) << " failed due to error on host="
+               << backend_hostname;
   }
-  ReleaseAdmissionControlResources();
-  // Can compute summary only after we stop accepting reports from the backends. Both
-  // WaitForBackends() and CancelBackends() ensures that.
-  // TODO: should move this off of the query execution path?
-  ComputeQuerySummary();
+  return query_status_;
 }
 
 Status Coordinator::FinalizeHdfsInsert() {
@@ -550,7 +491,7 @@ Status Coordinator::FinalizeHdfsInsert() {
 
   VLOG_QUERY << "Finalizing query: " << PrintId(query_id());
   SCOPED_TIMER(finalization_timer_);
-  Status return_status = UpdateExecState(Status::OK(), nullptr, FLAGS_hostname);
+  Status return_status = GetStatus();
   if (return_status.ok()) {
     HdfsTableDescriptor* hdfs_table;
     RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
@@ -576,13 +517,22 @@ Status Coordinator::FinalizeHdfsInsert() {
   return return_status;
 }
 
-void Coordinator::WaitForBackends() {
-  int32_t num_remaining = backend_exec_complete_barrier_->pending();
-  if (num_remaining > 0) {
-    VLOG_QUERY << "Coordinator waiting for backends to finish, " << num_remaining
-               << " remaining. query_id=" << PrintId(query_id());
-    backend_exec_complete_barrier_->Wait();
+Status Coordinator::WaitForBackendCompletion() {
+  unique_lock<mutex> l(lock_);
+  while (num_remaining_backends_ > 0 && query_status_.ok()) {
+    VLOG_QUERY << "Coordinator waiting for backends to finish, "
+               << num_remaining_backends_ << " remaining. query_id="
+               << PrintId(query_id());
+    backend_completion_cv_.Wait(l);
   }
+  if (query_status_.ok()) {
+    VLOG_QUERY << "All backends finished successfully. query_id=" << PrintId(query_id());
+  } else {
+    VLOG_QUERY << "All backends finished due to one or more errors. query_id="
+               << PrintId(query_id()) << ". " << query_status_.GetDetail();
+  }
+
+  return query_status_;
 }
 
 Status Coordinator::Wait() {
@@ -593,22 +543,34 @@ Status Coordinator::Wait() {
 
   if (stmt_type_ == TStmtType::QUERY) {
     DCHECK(coord_instance_ != nullptr);
-    return UpdateExecState(coord_instance_->WaitForOpen(),
-        &coord_instance_->runtime_state()->fragment_instance_id(), FLAGS_hostname);
+    return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true,
+        runtime_state()->fragment_instance_id());
   }
+
   DCHECK_EQ(stmt_type_, TStmtType::DML);
-  // DML finalization can only happen when all backends have completed all side-effects
-  // and reported relevant state.
-  WaitForBackends();
-  if (finalize_params() != nullptr) {
-    RETURN_IF_ERROR(UpdateExecState(
-            FinalizeHdfsInsert(), nullptr, FLAGS_hostname));
-  }
-  // DML requests are finished at this point.
-  RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
+  // Query finalization can only happen when all backends have reported relevant
+  // state. They only have relevant state to report in the parallel INSERT case,
+  // otherwise all the relevant state is from the coordinator fragment which will be
+  // available after Open() returns.  Ignore the returned status if finalization is
+  // required., since FinalizeHdfsInsert() will pick it up and needs to execute
+  // regardless.
+  Status status = WaitForBackendCompletion();
+  if (finalize_params() == nullptr && !status.ok()) return status;
+
+  // Execution of query fragments has finished. We don't need to hold onto query execution
+  // resources while we finalize the query.
+  ReleaseExecResources();
+  // Query finalization is required only for HDFS table sinks
+  if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
+  // Release admission control resources after we'd done the potentially heavyweight
+  // finalization.
+  ReleaseAdmissionControlResources();
+
   query_profile_->AddInfoString(
       "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
-  return Status::OK();
+  // For DML queries, when Wait is done, the query is complete.
+  ComputeQuerySummary();
+  return status;
 }
 
 Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
@@ -616,54 +578,88 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  // exec_state_lock_ not needed here though since this path won't execute concurrently
-  // with itself or DML finalization.
-  if (exec_state_ == ExecState::RETURNED_RESULTS) {
-    // Nothing left to do: already in a terminal state and no more results.
+  if (returned_all_results_) {
+    // May be called after the first time we set *eos. Re-set *eos and return here;
+    // already torn-down coord_sink_ so no more work to do.
     *eos = true;
     return Status::OK();
   }
-  DCHECK(coord_instance_ != nullptr) << "Exec() should be called first";
-  DCHECK(coord_sink_ != nullptr)     << "Exec() should be called first";
-  RuntimeState* runtime_state = coord_instance_->runtime_state();
 
-  Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos);
-  RETURN_IF_ERROR(UpdateExecState(
-          status, &runtime_state->fragment_instance_id(), FLAGS_hostname));
-  if (*eos) RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
+  DCHECK(coord_sink_ != nullptr)
+      << "GetNext() called without result sink. Perhaps Prepare() failed and was not "
+      << "checked?";
+  Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos);
+
+  // if there was an error, we need to return the query's error status rather than
+  // the status we just got back from the local executor (which may well be CANCELLED
+  // in that case).  Coordinator fragment failed in this case so we log the query_id.
+  RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true,
+      runtime_state()->fragment_instance_id()));
+
+  if (*eos) {
+    returned_all_results_ = true;
+    query_events_->MarkEvent("Last row fetched");
+    // release query execution resources here, since we won't be fetching more result rows
+    ReleaseExecResources();
+    // wait for all backends to complete before computing the summary
+    // TODO: relocate this so GetNext() won't have to wait for backends to complete?
+    RETURN_IF_ERROR(WaitForBackendCompletion());
+    // Release admission control resources after backends are finished.
+    ReleaseAdmissionControlResources();
+    // if the query completed successfully, compute the summary
+    if (query_status_.ok()) ComputeQuerySummary();
+  }
+
   return Status::OK();
 }
 
-void Coordinator::Cancel() {
-  // Illegal to call Cancel() before Exec() returns, so there's no danger of the cancel
-  // RPC passing the exec RPC.
-  DCHECK(exec_rpcs_complete_barrier_ != nullptr &&
-      exec_rpcs_complete_barrier_->pending() <= 0) << "Exec() must be called first";
-  discard_result(SetNonErrorTerminalState(ExecState::CANCELLED));
+void Coordinator::Cancel(const Status* cause) {
+  lock_guard<mutex> l(lock_);
+  // if the query status indicates an error, cancellation has already been initiated;
+  // prevent others from cancelling a second time
+  if (!query_status_.ok()) return;
+
+  // TODO: This should default to OK(), not CANCELLED if there is no cause (or callers
+  // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end
+  // of a successful query. Need to clean up relationship between query_status_ here and
+  // in QueryExecState. See IMPALA-4279.
+  query_status_ = (cause != nullptr && !cause->ok()) ? *cause : Status::CANCELLED;
+  CancelInternal();
 }
 
-void Coordinator::CancelBackends() {
+void Coordinator::CancelInternal() {
+  VLOG_QUERY << "Cancel() query_id=" << PrintId(query_id());
+  // TODO: remove when restructuring cancellation, which should happen automatically
+  // as soon as the coordinator knows that the query is finished
+  DCHECK(!query_status_.ok());
+
   int num_cancelled = 0;
   for (BackendState* backend_state: backend_states_) {
     DCHECK(backend_state != nullptr);
     if (backend_state->Cancel()) ++num_cancelled;
   }
-  backend_exec_complete_barrier_->NotifyRemaining();
-
   VLOG_QUERY << Substitute(
       "CancelBackends() query_id=$0, tried to cancel $1 backends",
       PrintId(query_id()), num_cancelled);
+  backend_completion_cv_.NotifyAll();
+
+  ReleaseExecResourcesLocked();
+  ReleaseAdmissionControlResourcesLocked();
+  // Report the summary with whatever progress the query made before being cancelled.
+  ComputeQuerySummary();
 }
 
 Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) {
-  VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id())
-            << " backend_idx=" << params.coord_state_idx;
+  VLOG_FILE << "UpdateBackendExecStatus()  backend_idx=" << params.coord_state_idx;
   if (params.coord_state_idx >= backend_states_.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Unknown backend index $0 (max known: $1)",
             params.coord_state_idx, backend_states_.size() - 1));
   }
   BackendState* backend_state = backend_states_[params.coord_state_idx];
+  // TODO: return here if returned_all_results_?
+  // TODO: return CANCELLED in that case? Although that makes the cancellation propagation
+  // path more irregular.
 
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
@@ -672,30 +668,46 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
   }
 
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
-    // This backend execution has completed.
+    // This report made this backend done, so update the status and
+    // num_remaining_backends_.
+
+    // for now, abort the query if we see any error except if returned_all_results_ is
+    // true (UpdateStatus() initiates cancellation, if it hasn't already been)
+    // TODO: clarify control flow here, it's unclear we should even process this status
+    // report if returned_all_results_ is true
     bool is_fragment_failure;
     TUniqueId failed_instance_id;
     Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id);
-    int pending_backends = backend_exec_complete_barrier_->Notify();
-    if (VLOG_QUERY_IS_ON && pending_backends >= 0) {
-      VLOG_QUERY << "Backend completed:"
-                 << " host=" << TNetworkAddressToString(backend_state->impalad_address())
-                 << " remaining=" << pending_backends
-                 << " query_id=" << PrintId(query_id());
+    if (!status.ok() && !returned_all_results_) {
+      Status ignored =
+          UpdateStatus(status, TNetworkAddressToString(backend_state->impalad_address()),
+              is_fragment_failure, failed_instance_id);
+      return Status::OK();
+    }
+
+    lock_guard<mutex> l(lock_);
+    DCHECK_GT(num_remaining_backends_, 0);
+    if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
+      VLOG_QUERY << "Backend completed: "
+          << " host=" << TNetworkAddressToString(backend_state->impalad_address())
+          << " remaining=" << num_remaining_backends_ - 1
+          << " query_id=" << PrintId(query_id());
       BackendState::LogFirstInProgress(backend_states_);
     }
-    if (!status.ok()) {
-      // TODO: IMPALA-5119: call UpdateExecState() asynchronously rather than
-      // from within this RPC handler (since it can make RPCs).
-      discard_result(UpdateExecState(status,
-              is_fragment_failure ? &failed_instance_id : nullptr,
-              TNetworkAddressToString(backend_state->impalad_address())));
+    if (--num_remaining_backends_ == 0 || !status.ok()) {
+      backend_completion_cv_.NotifyAll();
     }
+    return Status::OK();
   }
   // If all results have been returned, return a cancelled status to force the fragment
   // instance to stop executing.
-  // TODO: Make returning CANCELLED unnecessary with IMPALA-6984.
-  return ReturnedAllResults() ? Status::CANCELLED : Status::OK();
+  if (returned_all_results_) return Status::CANCELLED;
+
+  return Status::OK();
+}
+
+RuntimeState* Coordinator::runtime_state() {
+  return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state();
 }
 
 // TODO: add histogram/percentile
@@ -728,14 +740,20 @@ void Coordinator::ComputeQuerySummary() {
 
 string Coordinator::GetErrorLog() {
   ErrorLogMap merged;
-  {
-    lock_guard<SpinLock> l(backend_states_init_lock_);
-    for (BackendState* state: backend_states_) state->MergeErrorLog(&merged);
+  for (BackendState* state: backend_states_) {
+    state->MergeErrorLog(&merged);
   }
   return PrintErrorMapToString(merged);
 }
 
 void Coordinator::ReleaseExecResources() {
+  lock_guard<mutex> l(lock_);
+  ReleaseExecResourcesLocked();
+}
+
+void Coordinator::ReleaseExecResourcesLocked() {
+  if (released_exec_resources_) return;
+  released_exec_resources_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
@@ -749,6 +767,8 @@ void Coordinator::ReleaseExecResources() {
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
+  // Need to protect against failed Prepare(), where root_sink() would not be set.
+  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
   // Now that we've released our own resources, can release query-wide resources.
   if (query_state_ != nullptr) query_state_->ReleaseExecResourceRefcount();
   // At this point some tracked memory may still be used in the coordinator for result
@@ -756,21 +776,28 @@ void Coordinator::ReleaseExecResources() {
 }
 
 void Coordinator::ReleaseAdmissionControlResources() {
-  LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id());
+  lock_guard<mutex> l(lock_);
+  ReleaseAdmissionControlResourcesLocked();
+}
+
+void Coordinator::ReleaseAdmissionControlResourcesLocked() {
+  if (released_admission_control_resources_) return;
+  LOG(INFO) << "Release admission control resources for query_id="
+            << PrintId(query_ctx().query_id);
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
   DCHECK(admission_controller != nullptr);
   admission_controller->ReleaseQuery(schedule_);
+  released_admission_control_resources_ = true;
   query_events_->MarkEvent("Released admission control resources");
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
-  DCHECK(exec_rpcs_complete_barrier_.get() != nullptr)
+  DCHECK(exec_complete_barrier_.get() != nullptr)
       << "Filters received before fragments started!";
-
-  exec_rpcs_complete_barrier_->Wait();
+  exec_complete_barrier_->Wait();
   DCHECK(filter_routing_table_complete_)
       << "Filter received before routing table complete";
 
@@ -841,7 +868,6 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   rpc_params.__set_dst_query_id(query_id());
   rpc_params.__set_filter_id(params.filter_id);
 
-  // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
       rpc_params.__set_dst_fragment_idx(fragment_idx);
@@ -915,19 +941,23 @@ void Coordinator::FilterState::Disable(MemTracker* tracker) {
   }
 }
 
+const TUniqueId& Coordinator::query_id() const {
+  return query_ctx().query_id;
+}
+
 void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
   lock_guard<SpinLock> l(exec_summary_.lock);
   *exec_summary = exec_summary_.thrift_exec_summary;
 }
 
 MemTracker* Coordinator::query_mem_tracker() const {
-  return query_state_->query_mem_tracker();
+  return query_state()->query_mem_tracker();
 }
 
 void Coordinator::BackendsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<SpinLock> l(backend_states_init_lock_);
+    lock_guard<mutex> l(lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->ToJson(&val, doc);
@@ -940,7 +970,7 @@ void Coordinator::BackendsToJson(Document* doc) {
 void Coordinator::FInstanceStatsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<SpinLock> l(backend_states_init_lock_);
+    lock_guard<mutex> l(lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->InstanceStatsToJson(&val, doc);
@@ -950,14 +980,6 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
   doc->AddMember("backend_instances", states, doc->GetAllocator());
 }
 
-const TQueryCtx& Coordinator::query_ctx() const {
-  return schedule_.request().query_ctx;
-}
-
-const TUniqueId& Coordinator::query_id() const {
-  return query_ctx().query_id;
-}
-
 const TFinalizeParams* Coordinator::finalize_params() const {
   return schedule_.request().__isset.finalize_params
       ? &schedule_.request().finalize_params : nullptr;

http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 36c9f26..723047b 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -20,15 +20,26 @@
 
 #include <string>
 #include <vector>
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/median.hpp>
+#include <boost/accumulators/statistics/min.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/unordered_map.hpp>
+#include <boost/unordered_set.hpp>
 #include <rapidjson/document.h>
 
 #include "common/global-types.h"
+#include "common/hdfs.h"
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/dml-exec-state.h"
+#include "runtime/query-state.h"
+#include "scheduling/query-schedule.h"
 #include "util/condition-variable.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile-counters.h"
@@ -45,7 +56,6 @@ class TPlanExecRequest;
 class TRuntimeProfileTree;
 class RuntimeProfile;
 class QueryResultSet;
-class QuerySchedule;
 class MemTracker;
 class PlanRootSink;
 class FragmentInstanceState;
@@ -55,9 +65,10 @@ class QueryState;
 /// Query coordinator: handles execution of fragment instances on remote nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the executing
 /// backends; it is also responsible for implementing all client requests regarding the
-/// query, including cancellation. Once a query ends, either by returning EOS, through
-/// client cancellation, returning an error, or by finalizing a DML request, the
-/// coordinator releases resources.
+/// query, including cancellation. Once a query ends, either through cancellation or
+/// by returning eos, the coordinator releases resources. (Note that DML requests
+/// always end with cancellation, via ImpalaServer::UnregisterQuery()/
+/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().)
 ///
 /// The coordinator monitors the execution status of fragment instances and aborts the
 /// entire query if an error is reported by any of them.
@@ -66,80 +77,80 @@ class QueryState;
 /// rows are produced by a fragment instance that always executes on the same machine as
 /// the coordinator.
 ///
-/// Thread-safe except where noted.
-///
+/// Thread-safe, with the exception of GetNext().
+//
 /// A typical sequence of calls for a single query (calls under the same numbered
 /// item can happen concurrently):
 /// 1. client: Exec()
 /// 2. client: Wait()/client: Cancel()/backend: UpdateBackendExecStatus()
 /// 3. client: GetNext()*/client: Cancel()/backend: UpdateBackendExecStatus()
 ///
-/// A query is considered to be executing until one of three things occurs:
-/// 1. An error is encountered. Backend cancellation is automatically initiated for all
-///    backends that haven't yet completed and the overall query status is set to the
-///    first (non-cancelled) encountered error status.
-/// 2. The query is cancelled via an explicit Cancel() call. The overall query status
-///    is set to CANCELLED and cancellation is initiated for all backends still
-///    executing (without an error status).
-/// 3. The query has returned all rows. The overall query status is OK (and remains
-///    OK). Client cancellation is no longer possible and subsequent backend errors are
-///    ignored. (TODO: IMPALA-6984 initiate backend cancellation in this case).
-///
-/// Lifecycle: this object must not be destroyed until after one of the three states
-/// above is reached (error, cancelled, or EOS) to ensure resources are released.
-///
-/// Lock ordering: (lower-numbered acquired before higher-numbered)
-/// 1. wait_lock_
-/// 2. exec_state_lock_, backend_states_init_lock_, filter_lock_,
-///    ExecSummary::lock (leafs)
+/// The implementation ensures that setting an overall error status and initiating
+/// cancellation of all fragment instances is atomic.
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files
 /// and unnest them
+/// TODO: clean up locking behavior; in particular, clarify dependency on lock_
+/// TODO: clarify cancellation path; in particular, cancel as soon as we return
+/// all results
 class Coordinator { // NOLINT: The member variables could be re-ordered to save space
  public:
   Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* events);
   ~Coordinator();
 
-  /// Initiate asynchronous execution of a query with the given schedule. When it
-  /// returns, all fragment instances have started executing at their respective
-  /// backends. Exec() must be called exactly once and a call to Exec() must precede
-  /// all other member function calls.
+  /// Initiate asynchronous execution of a query with the given schedule. When it returns,
+  /// all fragment instances have started executing at their respective backends.
+  /// A call to Exec() must precede all other member function calls.
   Status Exec() WARN_UNUSED_RESULT;
 
   /// Blocks until result rows are ready to be retrieved via GetNext(), or, if the
-  /// query doesn't return rows, until the query finishes or is cancelled. A call to
-  /// Wait() must precede all calls to GetNext().  Multiple calls to Wait() are
-  /// idempotent and it is okay to issue multiple Wait() calls concurrently.
+  /// query doesn't return rows, until the query finishes or is cancelled.
+  /// A call to Wait() must precede all calls to GetNext().
+  /// Multiple calls to Wait() are idempotent and it is okay to issue multiple
+  /// Wait() calls concurrently.
   Status Wait() WARN_UNUSED_RESULT;
 
   /// Fills 'results' with up to 'max_rows' rows. May return fewer than 'max_rows'
-  /// rows, but will not return more. If *eos is true, all rows have been returned.
-  /// Returns a non-OK status if an error was encountered either locally or by any of
-  /// the executing backends, or if the query was cancelled via Cancel().  After *eos
-  /// is true, subsequent calls to GetNext() will be a no-op.
+  /// rows, but will not return more.
+  ///
+  /// If *eos is true, execution has completed. Subsequent calls to GetNext() will be a
+  /// no-op.
+  ///
+  /// GetNext() will not set *eos=true until all fragment instances have either completed
+  /// or have failed.
+  ///
+  /// Returns an error status if an error was encountered either locally or by any of the
+  /// remote fragments or if the query was cancelled.
   ///
   /// GetNext() is not thread-safe: multiple threads must not make concurrent GetNext()
-  /// calls.
+  /// calls (but may call any of the other member functions concurrently with GetNext()).
   Status GetNext(QueryResultSet* results, int max_rows, bool* eos) WARN_UNUSED_RESULT;
 
-  /// Cancel execution of query and sets the overall query status to CANCELLED if the
-  /// query is still executing. Idempotent.
-  void Cancel();
+  /// Cancel execution of query. This includes the execution of the local plan fragment,
+  /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the
+  /// given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED.
+  /// Idempotent.
+  void Cancel(const Status* cause = nullptr);
 
-  /// Called by the report status RPC handler to update execution status of a
-  /// particular backend as well as dml_exec_state_ and the profile.
+  /// Updates execution status of a particular backend as well as dml_exec_state_.
+  /// Also updates num_remaining_backends_ and cancels execution if the backend has an
+  /// error status.
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
+  /// Only valid to call after Exec().
+  QueryState* query_state() const { return query_state_; }
+
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
   /// the future if not all fragments have finished execution.
   RuntimeProfile* query_profile() const { return query_profile_; }
 
-  /// Safe to call only after Exec().
+  const TUniqueId& query_id() const;
+
   MemTracker* query_mem_tracker() const;
 
-  /// Safe to call only after Wait().
+  /// This is safe to call only after Wait()
   DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Return error log for coord and all the fragments. The error messages from the
@@ -148,6 +159,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   const ProgressUpdater& progress() { return progress_; }
 
+  /// Returns query_status_.
+  Status GetStatus();
+
   /// Get a copy of the current exec summary. Thread-safe.
   void GetTExecSummary(TExecSummary* exec_summary);
 
@@ -174,20 +188,18 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// owned by the ClientRequestState that owns this coordinator
   const QuerySchedule& schedule_;
 
-  /// Copied from TQueryExecRequest, governs when finalization occurs. Set in Exec().
+  /// copied from TQueryExecRequest, governs when to call ReportQuerySummary
   TStmtType::type stmt_type_;
 
-  /// BackendStates for all execution backends, including the coordinator. All elements
-  /// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates().
+  /// BackendStates for all execution backends, including the coordinator.
+  /// All elements are non-nullptr. Owned by obj_pool(). Populated by
+  /// InitBackendExec().
   std::vector<BackendState*> backend_states_;
 
-  /// Protects the population of backend_states_ vector (not the BackendState objects).
-  /// Used when accessing backend_states_ if it's not guaranteed that
-  /// InitBackendStates() has completed.
-  SpinLock backend_states_init_lock_;
+  // index into backend_states_ for coordinator fragment; -1 if no coordinator fragment
+  int coord_backend_idx_ = -1;
 
-  /// The QueryState for this coordinator. Reference taken in Exec(). Reference
-  /// released in destructor.
+  /// The QueryState for this coordinator. Set in Exec(). Released in TearDown().
   QueryState* query_state_ = nullptr;
 
   /// Non-null if and only if the query produces results for the client; i.e. is of
@@ -198,10 +210,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Result rows are materialized by this fragment instance in its own thread. They are
   /// materialized into a QueryResultSet provided to the coordinator during GetNext().
   ///
-  /// Owned by the QueryState. Set in Exec().
+  /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
+  /// reference of QueryState released) in TearDown().
   FragmentInstanceState* coord_instance_ = nullptr;
 
-  /// Owned by the QueryState. Set in Exec().
+  /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when
+  /// GetNext() hits eos.
   PlanRootSink* coord_sink_ = nullptr;
 
   /// ensures single-threaded execution of Wait(). See lock ordering class comment.
@@ -209,17 +223,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   bool has_called_wait_ = false;  // if true, Wait() was called; protected by wait_lock_
 
-  /// Keeps track of number of completed ranges and total scan ranges. Initialized by
-  /// Exec().
+  /// Keeps track of number of completed ranges and total scan ranges.
   ProgressUpdater progress_;
 
-  /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in Exec().
-  RuntimeProfile* query_profile_ = nullptr;
-
-  /// Total time spent in finalization (typically 0 except for INSERT into hdfs
-  /// tables). Set in Exec().
-  RuntimeProfile::Counter* finalization_timer_ = nullptr;
-
   /// Total number of filter updates received (always 0 if filter mode is not
   /// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec().
   RuntimeProfile::Counter* filter_updates_received_ = nullptr;
@@ -250,7 +256,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
     void Init(const QuerySchedule& query_schedule);
   };
 
-  // Initialized by Exec().
   ExecSummary exec_summary_;
 
   /// Filled in as the query completes and tracks the results of DML queries.  This is
@@ -258,40 +263,52 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// coordinator fragment: only one of the two can legitimately produce updates.
   DmlExecState dml_exec_state_;
 
+  /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
+  RuntimeProfile* query_profile_ = nullptr;
+
+  /// Protects all fields below. This is held while making RPCs, so this lock should
+  /// only be acquired if the acquiring thread is prepared to wait for a significant
+  /// time.
+  /// TODO: clarify to what extent the fields below need to be protected by lock_
+  /// Lock ordering is
+  /// 1. wait_lock_
+  /// 2. lock_
+  /// 3. BackendState::lock_
+  /// 4. filter_lock_
+  boost::mutex lock_;
+
+  /// Overall status of the entire query; set to the first reported fragment error
+  /// status or to CANCELLED, if Cancel() is called.
+  Status query_status_;
+
+  /// If true, the query is done returning all results.  It is possible that the
+  /// coordinator still needs to wait for cleanup on remote fragments (e.g. queries
+  /// with limit)
+  /// Once this is set to true, errors from execution backends are ignored.
+  bool returned_all_results_ = false;
+
+  /// If there is no coordinator fragment, Wait() simply waits until all
+  /// backends report completion by notifying on backend_completion_cv_.
+  /// Tied to lock_.
+  ConditionVariable backend_completion_cv_;
+
+  /// Count of the number of backends for which done != true. When this
+  /// hits 0, any Wait()'ing thread is notified
+  int num_remaining_backends_ = 0;
+
   /// Event timeline for this query. Not owned.
   RuntimeProfile::EventSequence* query_events_ = nullptr;
 
-  /// Indexed by fragment idx (TPlanFragment.idx). Filled in
-  /// Exec()/InitFragmentStats(), elements live in obj_pool(). Updated by BackendState
-  /// sequentially, without synchronization.
+  /// Indexed by fragment idx (TPlanFragment.idx). Filled in InitFragmentStats(),
+  /// elements live in obj_pool().
   std::vector<FragmentStats*> fragment_stats_;
 
-  /// Barrier that is released when all calls to BackendState::Exec() have
-  /// returned. Initialized in StartBackendExec().
-  boost::scoped_ptr<CountingBarrier> exec_rpcs_complete_barrier_;
-
-  /// Barrier that is released when all backends have indicated execution completion,
-  /// or when all backends are cancelled due to an execution error or client requested
-  /// cancellation. Initialized in StartBackendExec().
-  boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
-
-  SpinLock exec_state_lock_; // protects exec-state_ and exec_status_
-
-  /// EXECUTING: in-flight; the only non-terminal state
-  /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is complete
-  /// CANCELLED: Cancel() was called (not: someone called CancelBackends())
-  /// ERROR: received an error from a backend
-  enum class ExecState {
-    EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR
-  };
-  ExecState exec_state_ = ExecState::EXECUTING;
+  /// total time spent in finalization (typically 0 except for INSERT into hdfs tables)
+  RuntimeProfile::Counter* finalization_timer_ = nullptr;
 
-  /// Overall execution status; only set on exec_state_ transitions:
-  /// - EXECUTING: OK
-  /// - RETURNED_RESULTS: OK
-  /// - CANCELLED: CANCELLED
-  /// - ERROR: error status
-  Status exec_status_;
+  /// Barrier that is released when all calls to ExecRemoteFragment() have
+  /// returned, successfully or not. Initialised during Exec().
+  boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
 
   /// Protects filter_routing_table_.
   SpinLock filter_lock_;
@@ -304,6 +321,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
+  /// True if and only if ReleaseExecResources() has been called.
+  bool released_exec_resources_ = false;
+
+  /// True if and only if ReleaseAdmissionControlResources() has been called.
+  bool released_admission_control_resources_ = false;
+
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
@@ -311,67 +334,36 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// HDFS INSERT finalization is not required.
   const TFinalizeParams* finalize_params() const;
 
-  const TQueryCtx& query_ctx() const;
+  const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
 
-  const TUniqueId& query_id() const;
+  /// Only valid *after* calling Exec(). Return nullptr if the running query does not
+  /// produce any rows.
+  RuntimeState* runtime_state();
 
   /// Returns a pretty-printed table of the current filter state.
   std::string FilterDebugString();
 
-  /// Called when the query is done executing due to reaching EOS or client
-  /// cancellation. If 'exec_state_' != EXECUTING, does nothing. Otherwise sets
-  /// 'exec_state_' to 'state' (must be either CANCELLED or RETURNED_RESULTS), and
-  /// finalizes execution (cancels remaining backends if transitioning to CANCELLED;
-  /// either way, calls ComputeQuerySummary() and releases resources). Returns the
-  /// resulting overall execution status.
-  Status SetNonErrorTerminalState(const ExecState state) WARN_UNUSED_RESULT;
-
-  /// Transitions 'exec_state_' given an execution status and returns the resulting
-  /// overall status:
-  ///
-  /// - if the 'status' parameter is ok, no state transition
-  /// - if 'exec_state_' is EXECUTING and 'status' is not ok, transitions to ERROR
-  /// - if 'exec_state_' is already RETURNED_RESULTS, CANCELLED, or ERROR: does not
-  ///   transition state (those are terminal states) however in the case of ERROR,
-  ///   status may be updated to a more interesting status.
-  ///
-  /// Should not be called for (client initiated) cancellation. Call
-  /// SetNonErrorTerminalState(CANCELLED) instead.
-  ///
-  /// 'failed_finstance' is the fragment instance id that has failed (or nullptr if the
-  /// failure is not specific to a fragment instance), used for error reporting along
-  /// with 'instance_hostname'.
-  Status UpdateExecState(const Status& status, const TUniqueId* failed_finstance,
-      const string& instance_hostname) WARN_UNUSED_RESULT;
-
-  /// Helper for SetNonErrorTerminalState() and UpdateExecStateIfError(). If the caller
-  /// transitioned to a terminal state (which happens exactly once for the lifetime of
-  /// the Coordinator object), then finalizes execution (cancels remaining backends if
-  /// transitioning to CANCELLED; in all cases releases resources and calls
-  /// ComputeQuerySummary().
-  void HandleExecStateTransition(const ExecState old_state, const ExecState new_state);
-
-  /// Return true if 'exec_state_' is RETURNED_RESULTS.
-  /// TODO: remove with IMPALA-6984.
-  bool ReturnedAllResults() WARN_UNUSED_RESULT;
-
-  /// Return the string representation of 'state'.
-  static const char* ExecStateToString(const ExecState state);
-
-  // For DCHECK_EQ, etc of ExecState values.
-  friend std::ostream& operator<<(std::ostream& o, const ExecState s) {
-    return o << ExecStateToString(s);
-  }
-
-  /// Helper for HandleExecStateTransition(). Sends cancellation request to all
-  /// executing backends but does not wait for acknowledgement from the backends. The
-  /// ExecState state-machine ensures this is called at most once.
-  void CancelBackends();
-
-  /// Returns only when either all execution backends have reported success or a request
-  /// to cancel the backends has already been sent. It is safe to call this concurrently,
-  /// but any calls must be made only after Exec().
-  void WaitForBackends();
+  /// Cancels all fragment instances. Assumes that lock_ is held. This may be called when
+  /// the query is not being cancelled in the case where the query limit is reached.
+  void CancelInternal();
+
+  /// Acquires lock_ and updates query_status_ with 'status' if it's not already
+  /// an error status, and returns the current query_status_. The status may be
+  /// due to an error in a specific fragment instance, or it can be a general error
+  /// not tied to a specific fragment instance.
+  /// Calls CancelInternal() when switching to an error status.
+  /// When an error is due to a specific fragment instance, 'is_fragment_failure' must
+  /// be true and 'failed_fragment' is the fragment_id that has failed, used for error
+  /// reporting. For a general error not tied to a specific instance,
+  /// 'is_fragment_failure' must be false and 'failed_fragment' will be ignored.
+  /// 'backend_hostname' is used for error reporting in either case.
+  Status UpdateStatus(const Status& status, const std::string& backend_hostname,
+      bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
+
+  /// Returns only when either all execution backends have reported success or the query
+  /// is in error. Returns the status of the query.
+  /// It is safe to call this concurrently, but any calls must be made only after Exec().
+  Status WaitForBackendCompletion() WARN_UNUSED_RESULT;
 
   /// Initializes fragment_stats_ and query_profile_. Must be called before
   /// InitBackendStates().
@@ -393,33 +385,36 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// finishing the INSERT in flight.
   Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
 
-  /// Helper for Exec(). Populates backend_states_, starts query execution at all
-  /// backends in parallel, and blocks until startup completes.
+  /// Populates backend_states_, starts query execution at all backends in parallel, and
+  /// blocks until startup completes.
   void StartBackendExec();
 
-  /// Helper for Exec(). Checks for errors encountered when starting backend execution,
-  /// using any non-OK status, if any, as the overall status. Returns the overall
-  /// status. Also updates query_profile_ with the startup latency histogram.
+  /// Calls CancelInternal() and returns an error if there was any error starting
+  /// backend execution.
+  /// Also updates query_profile_ with the startup latency histogram.
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
   /// filters that they either produce or consume.
   void InitFilterRoutingTable();
 
-  /// Helper for HandleExecStateTransition(). Releases all resources associated with
-  /// query execution. The ExecState state-machine ensures this is called exactly once.
+  /// Releases all resources associated with query execution. Acquires lock_. Idempotent.
   void ReleaseExecResources();
 
-  /// Helper for HandleExecStateTransition(). Releases admission control resources for
-  /// use by other queries. This should only be called if one of following
-  /// preconditions is satisfied for each backend on which the query is executing:
-  ///
-  /// * The backend finished execution.  Rationale: the backend isn't consuming
-  ///   resources.
+  /// Same as ReleaseExecResources() except the lock must be held by the caller.
+  void ReleaseExecResourcesLocked();
+
+  /// Releases admission control resources for use by other queries.
+  /// This should only be called if one of following preconditions is satisfied for each
+  /// backend on which the query is executing:
+  /// * The backend finished execution.
+  ///   Rationale: the backend isn't consuming resources.
+  //
   /// * A cancellation RPC was delivered to the backend.
   ///   Rationale: the backend will be cancelled and release resources soon. By the
   ///   time a newly admitted query fragment starts up on the backend and starts consuming
   ///   resources, the resources from this query will probably have been released.
+  //
   /// * Sending the cancellation RPC to the backend failed
   ///   Rationale: the backend is either down or will tear itself down when it next tries
   ///   to send a status RPC to the coordinator. It's possible that the fragment will be
@@ -430,13 +425,16 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   ///   pessimistically queueing queries while we wait for a response from a backend that
   ///   may never come.
   ///
-  /// Calling WaitForBackends() or CancelBackends() before this function is sufficient
-  /// to satisfy the above preconditions. If the query has an expensive finalization
-  /// step post query execution (e.g. a DML statement), then this should be called
-  /// after that completes to avoid over-admitting queries.
+  /// Calling WaitForBackendCompletion() or CancelInternal() before this function is
+  /// sufficient to satisfy the above preconditions. If the query has an expensive
+  /// finalization step post query execution (e.g. a DML statement), then this should
+  /// be called after that completes to avoid over-admitting queries.
   ///
-  /// The ExecState state-machine ensures this is called exactly once.
+  /// Acquires lock_. Idempotent.
   void ReleaseAdmissionControlResources();
+
+  /// Same as ReleaseAdmissionControlResources() except lock must be held by caller.
+  void ReleaseAdmissionControlResourcesLocked();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 64cf219..5186c6f 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -904,7 +904,7 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
 
   // Cancel the parent query. 'lock_' should not be held because cancellation involves
   // RPCs and can block for a long time.
-  if (coord != NULL) coord->Cancel();
+  if (coord != NULL) coord->Cancel(cause);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 3af4c9b..fb3f261 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -111,6 +111,11 @@ class ClientRequestState;
 /// 6. ClientRequestState::expiration_data_lock_
 /// 7. Coordinator::exec_summary_lock
 ///
+/// Coordinator::lock_ should not be acquired at the same time as the
+/// ImpalaServer/SessionState/ClientRequestState locks. Aside from
+/// Coordinator::exec_summary_lock_ the Coordinator's lock ordering is independent of
+/// the above lock ordering.
+///
 /// The following locks are not held in conjunction with other locks:
 /// * query_log_lock_
 /// * session_timeout_lock_

http://git-wip-us.apache.org/repos/asf/impala/blob/4fab4288/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 827c526..49b0bde 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -33,23 +33,8 @@ class CountingBarrier {
   }
 
   /// Sends one notification, decrementing the number of pending notifications by one.
-  /// Returns the remaining pending notifications.
-  int32_t Notify() {
-    int32_t result = count_.Add(-1);
-    if (result == 0) promise_.Set(true);
-    return result;
-  }
-
-  /// Sets the number of pending notifications to 0 and unblocks Wait().
-  void NotifyRemaining() {
-    while (true) {
-      int32_t value = count_.Load();
-      if (value <= 0) return;  // count_ can legitimately drop below 0
-      if (count_.CompareAndSwap(value, 0)) {
-        promise_.Set(true);
-        return;
-      }
-    }
+  void Notify() {
+    if (count_.Add(-1) == 0) promise_.Set(true);
   }
 
   /// Blocks until all notifications are received.
@@ -59,8 +44,6 @@ class CountingBarrier {
   /// case '*timed_out' will be true.
   void Wait(int64_t timeout_ms, bool* timed_out) { promise_.Get(timeout_ms, timed_out); }
 
-  int32_t pending() const { return count_.Load(); }
-
  private:
   /// Used to signal waiters when all notifications are received.
   Promise<bool> promise_;


[7/7] impala git commit: IMPALA-6802 (part 3): Clean up authorization tests

Posted by ta...@apache.org.
IMPALA-6802 (part 3): Clean up authorization tests

The third part of this patch is to rewrite the following authorization
tests:
- with
- union
- reset metadata
- show

Testing:
- Added new authorization tests
- Ran all front-end tests

Change-Id: I9681cc3c7094db33ab7c5caa69b99dd803b908b7
Cherry-picks: not for 2.x
Reviewed-on: http://gerrit.cloudera.org:8080/10358
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/75a10a3d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/75a10a3d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/75a10a3d

Branch: refs/heads/master
Commit: 75a10a3dda80137120ecb9c1871e6bdfb74f6522
Parents: be2d61a
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Wed May 9 11:27:50 2018 -0500
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed May 16 02:50:30 2018 +0000

----------------------------------------------------------------------
 .../impala/analysis/AuthorizationTestV2.java    | 510 ++++++++++++++-----
 1 file changed, 396 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/75a10a3d/fe/src/test/java/org/apache/impala/analysis/AuthorizationTestV2.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTestV2.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTestV2.java
index 9845368..59ea4a1 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationTestV2.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationTestV2.java
@@ -26,11 +26,14 @@ import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.AuthorizationException;
 import org.apache.impala.catalog.Role;
 import org.apache.impala.catalog.RolePrivilege;
+import org.apache.impala.catalog.ScalarFunction;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.Frontend;
 import org.apache.impala.testutil.ImpaladTestCatalog;
+import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TPrivilegeLevel;
 import org.apache.impala.thrift.TPrivilegeScope;
@@ -89,7 +92,7 @@ public class AuthorizationTestV2 extends FrontendTestBase {
 
   @Test
   public void testPrivilegeRequests() throws ImpalaException {
-    // Select *
+    // Used for select *, with, and union
     Set<String> expectedAuthorizables = Sets.newHashSet(
         "functional.alltypes",
         "functional.alltypes.id",
@@ -106,6 +109,7 @@ public class AuthorizationTestV2 extends FrontendTestBase {
         "functional.alltypes.year",
         "functional.alltypes.month"
     );
+    // Select *
     verifyPrivilegeReqs("select * from functional.alltypes", expectedAuthorizables);
     verifyPrivilegeReqs("select alltypes.* from functional.alltypes", expectedAuthorizables);
     verifyPrivilegeReqs(createAnalysisCtx("functional"), "select * from alltypes",
@@ -114,6 +118,19 @@ public class AuthorizationTestV2 extends FrontendTestBase {
         "select alltypes.* from alltypes", expectedAuthorizables);
     verifyPrivilegeReqs("select a.* from functional.alltypes a", expectedAuthorizables);
 
+    // With clause.
+    verifyPrivilegeReqs("with t as (select * from functional.alltypes) select * from t",
+        expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"),
+        "with t as (select * from alltypes) select * from t", expectedAuthorizables);
+
+    // Union.
+    verifyPrivilegeReqs("select * from functional.alltypes union all " +
+        "select * from functional.alltypes", expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"),
+        "select * from alltypes union all select * from alltypes",
+        expectedAuthorizables);
+
     // Select a specific column.
     expectedAuthorizables = Sets.newHashSet(
         "functional.alltypes",
@@ -163,28 +180,87 @@ public class AuthorizationTestV2 extends FrontendTestBase {
         "'hdfs://localhost:20500/test-warehouse/tpch.lineitem' " +
         "into table alltypes partition(month=10, year=2009)",
         expectedAuthorizables);
+
+    // Reset metadata.
+    expectedAuthorizables = Sets.newHashSet("functional.alltypes");
+    verifyPrivilegeReqs("invalidate metadata functional.alltypes", expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"), "invalidate metadata alltypes",
+        expectedAuthorizables);
+    verifyPrivilegeReqs("refresh functional.alltypes", expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"), "refresh alltypes",
+        expectedAuthorizables);
+
+    // Show tables.
+    expectedAuthorizables = Sets.newHashSet("functional.*.*");
+    verifyPrivilegeReqs("show tables in functional", expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"), "show tables",
+        expectedAuthorizables);
+
+    // Show partitions.
+    expectedAuthorizables = Sets.newHashSet("functional.alltypes");
+    verifyPrivilegeReqs("show partitions functional.alltypes", expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"), "show partitions alltypes",
+        expectedAuthorizables);
+
+    // Show range partitions.
+    expectedAuthorizables = Sets.newHashSet("functional_kudu.dimtbl");
+    verifyPrivilegeReqs("show range partitions functional_kudu.dimtbl",
+        expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional_kudu"),
+        "show range partitions dimtbl", expectedAuthorizables);
+
+    // Show table stats.
+    expectedAuthorizables = Sets.newHashSet("functional.alltypes");
+    verifyPrivilegeReqs("show table stats functional.alltypes",
+        expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"), "show table stats alltypes",
+        expectedAuthorizables);
+
+    // Show column stats.
+    expectedAuthorizables = Sets.newHashSet("functional.alltypes");
+    verifyPrivilegeReqs("show column stats functional.alltypes", expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"), "show column stats alltypes",
+        expectedAuthorizables);
+
+    // Show create table.
+    expectedAuthorizables = Sets.newHashSet("functional.alltypes");
+    verifyPrivilegeReqs("show create table functional.alltypes", expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"),
+        "show create table functional.alltypes", expectedAuthorizables);
+
+    // Show create view.
+    expectedAuthorizables = Sets.newHashSet("functional.alltypes_view");
+    verifyPrivilegeReqs("show create view functional.alltypes_view",
+        expectedAuthorizables);
+    verifyPrivilegeReqs(createAnalysisCtx("functional"),
+        "show create view functional.alltypes_view", expectedAuthorizables);
   }
 
   @Test
   public void testSelect() throws ImpalaException {
-    // Select a specific column on a table.
-    authorize("select id from functional.alltypes")
-        .ok(onServer(TPrivilegeLevel.ALL))
-        .ok(onServer(TPrivilegeLevel.SELECT))
-        .ok(onDatabase("functional", TPrivilegeLevel.ALL))
-        .ok(onDatabase("functional", TPrivilegeLevel.SELECT))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.SELECT))
-        .ok(onColumn("functional", "alltypes", "id", TPrivilegeLevel.SELECT))
-        .error(selectError("functional.alltypes"))
-        .error(selectError("functional.alltypes"), onServer(allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onDatabase("functional",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onTable("functional",
-            "alltypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onColumn("functional",
-            "alltypes", "id", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
+    for (AuthzTest authzTest : new AuthzTest[]{
+        // Select a specific column on a table.
+        authorize("select id from functional.alltypes"),
+        // With clause with select.
+        authorize("with t as (select id from functional.alltypes) select * from t"),
+        // Select without referencing a column.
+        authorize("select 1 from functional.alltypes")}) {
+      authzTest.ok(onServer(TPrivilegeLevel.ALL))
+          .ok(onServer(TPrivilegeLevel.ALL))
+          .ok(onServer(TPrivilegeLevel.SELECT))
+          .ok(onDatabase("functional", TPrivilegeLevel.ALL))
+          .ok(onDatabase("functional", TPrivilegeLevel.SELECT))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.SELECT))
+          .ok(onColumn("functional", "alltypes", "id", TPrivilegeLevel.SELECT))
+          .error(selectError("functional.alltypes"))
+          .error(selectError("functional.alltypes"), onServer(allExcept(
+              TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+          .error(selectError("functional.alltypes"), onDatabase("functional",
+              allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+          .error(selectError("functional.alltypes"), onTable("functional",
+              "alltypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
+    }
 
     // Select a specific column on a view.
     // Column-level privileges on views are not currently supported.
@@ -248,22 +324,6 @@ public class AuthorizationTestV2 extends FrontendTestBase {
             onTable("functional_seq_snap", "subquery_view", allExcept(
             TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
 
-    // Select without referencing a column.
-    authorize("select 1 from functional.alltypes")
-        .ok(onServer(TPrivilegeLevel.ALL))
-        .ok(onServer(TPrivilegeLevel.SELECT))
-        .ok(onDatabase("functional", TPrivilegeLevel.ALL))
-        .ok(onDatabase("functional", TPrivilegeLevel.SELECT))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.SELECT))
-        .error(selectError("functional.alltypes"))
-        .error(selectError("functional.alltypes"), onServer(allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onDatabase("functional",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onTable("functional", "alltypes",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
-
     // Select from non-existent database.
     authorize("select 1 from nodb.alltypes")
         .error(selectError("nodb.alltypes"));
@@ -290,12 +350,7 @@ public class AuthorizationTestV2 extends FrontendTestBase {
         .error(selectError("functional.alltypes"), onDatabase("functional",
             allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
         .error(selectError("functional.alltypes"), onTable("functional", "alltypes",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onColumn("functional", "alltypes",
-            new String[]{"id", "bool_col", "tinyint_col", "smallint_col", "int_col",
-            "bigint_col", "float_col", "double_col", "date_string_col", "string_col",
-            "timestamp_col", "year", "month"}, allExcept(TPrivilegeLevel.ALL,
-            TPrivilegeLevel.SELECT)));
+            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
 
     // Select with columns referenced in function, where clause and group by.
     authorize("select count(id), int_col from functional.alltypes where id = 10 " +
@@ -314,9 +369,7 @@ public class AuthorizationTestV2 extends FrontendTestBase {
         .error(selectError("functional.alltypes"), onDatabase("functional",
             allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
         .error(selectError("functional.alltypes"), onTable("functional", "alltypes",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onColumn("functional", "alltypes",
-            "id", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
+            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
 
     // Select on tables with complex types.
     authorize("select a.int_struct_col.f1 from functional.allcomplextypes a " +
@@ -335,10 +388,7 @@ public class AuthorizationTestV2 extends FrontendTestBase {
         .error(selectError("functional.allcomplextypes"), onDatabase("functional",
             allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
         .error(selectError("functional.allcomplextypes"), onTable("functional",
-            "allcomplextypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.allcomplextypes"), onColumn("functional",
-            "allcomplextypes", new String[]{"id", "int_struct_col"}, allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
+            "allcomplextypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
 
     authorize("select key, pos, item.f1, f2 from functional.allcomplextypes t, " +
         "t.struct_array_col, functional.allcomplextypes.int_map_col")
@@ -356,51 +406,84 @@ public class AuthorizationTestV2 extends FrontendTestBase {
         .error(selectError("functional.allcomplextypes"), onDatabase("functional",
             allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
         .error(selectError("functional.allcomplextypes"), onTable("functional",
-            "allcomplextypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.allcomplextypes"), onColumn("functional",
-            "allcomplextypes", new String[]{"struct_array_col", "int_map_col"},
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
+            "allcomplextypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)));
+
+    for (AuthzTest authzTest : new AuthzTest[]{
+        // Select with cross join.
+        authorize("select * from functional.alltypes union all " +
+            "select * from functional.alltypessmall"),
+        // Union on tables.
+        authorize("select * from functional.alltypes a cross join " +
+            "functional.alltypessmall b")}) {
+      authzTest.ok(onServer(TPrivilegeLevel.ALL))
+          .ok(onServer(TPrivilegeLevel.SELECT))
+          .ok(onDatabase("functional", TPrivilegeLevel.ALL))
+          .ok(onDatabase("functional", TPrivilegeLevel.SELECT))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL),
+              onTable("functional", "alltypessmall", TPrivilegeLevel.ALL))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.SELECT),
+              onTable("functional", "alltypessmall", TPrivilegeLevel.ALL))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL),
+              onTable("functional", "alltypessmall", TPrivilegeLevel.SELECT))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.SELECT),
+              onTable("functional", "alltypessmall", TPrivilegeLevel.SELECT))
+          .ok(onColumn("functional", "alltypes", new String[]{"id", "bool_col",
+                  "tinyint_col", "smallint_col", "int_col", "bigint_col", "float_col",
+                  "double_col", "date_string_col", "string_col", "timestamp_col", "year",
+                  "month"}, TPrivilegeLevel.SELECT),
+              onColumn("functional", "alltypessmall", new String[]{"id", "bool_col",
+                  "tinyint_col", "smallint_col", "int_col", "bigint_col", "float_col",
+                  "double_col", "date_string_col", "string_col", "timestamp_col", "year",
+                  "month"}, TPrivilegeLevel.SELECT))
+          .error(selectError("functional.alltypes"))
+          .error(selectError("functional.alltypes"), onServer(
+              allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+          .error(selectError("functional.alltypes"), onDatabase("functional",
+              allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+          .error(selectError("functional.alltypes"), onTable("functional", "alltypes",
+              allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)), onTable("functional",
+              "alltypessmall", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+          .error(selectError("functional.alltypessmall"), onColumn("functional", "alltypes",
+              new String[]{"id", "bool_col", "tinyint_col", "smallint_col", "int_col",
+                  "bigint_col", "float_col", "double_col", "date_string_col", "string_col",
+                  "timestamp_col", "year", "month"}, TPrivilegeLevel.SELECT))
+          .error(selectError("functional.alltypes"), onColumn("functional",
+              "alltypessmall", new String[]{"id", "bool_col", "tinyint_col", "smallint_col",
+                  "int_col", "bigint_col", "float_col", "double_col", "date_string_col",
+                  "string_col", "timestamp_col", "year", "month"}, TPrivilegeLevel.SELECT));
+    }
 
-    // Select with cross join.
-    authorize("select * from functional.alltypes a cross join " +
-        "functional.alltypessmall b")
+    // Union on views.
+    // Column-level privileges on views are not currently supported.
+    authorize("select id from functional.alltypes_view union all " +
+        "select x from functional.alltypes_view_sub")
         .ok(onServer(TPrivilegeLevel.ALL))
         .ok(onServer(TPrivilegeLevel.SELECT))
         .ok(onDatabase("functional", TPrivilegeLevel.ALL))
         .ok(onDatabase("functional", TPrivilegeLevel.SELECT))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL),
-            onTable("functional", "alltypessmall", TPrivilegeLevel.ALL))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.SELECT),
-            onTable("functional", "alltypessmall", TPrivilegeLevel.ALL))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL),
-            onTable("functional", "alltypessmall", TPrivilegeLevel.SELECT))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.SELECT),
-            onTable("functional", "alltypessmall", TPrivilegeLevel.SELECT))
-        .ok(onColumn("functional", "alltypes", new String[]{"id", "bool_col",
-            "tinyint_col", "smallint_col", "int_col", "bigint_col", "float_col",
-            "double_col", "date_string_col", "string_col", "timestamp_col", "year",
-            "month"}, TPrivilegeLevel.SELECT),
-            onColumn("functional", "alltypessmall", new String[]{"id", "bool_col",
-            "tinyint_col", "smallint_col", "int_col", "bigint_col", "float_col",
-            "double_col", "date_string_col", "string_col", "timestamp_col", "year",
-            "month"}, TPrivilegeLevel.SELECT))
-        .error(selectError("functional.alltypes"))
-        .error(selectError("functional.alltypes"), onServer(
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onDatabase("functional",
+        .ok(onTable("functional", "alltypes_view", TPrivilegeLevel.ALL),
+            onTable("functional", "alltypes_view_sub", TPrivilegeLevel.ALL))
+        .ok(onTable("functional", "alltypes_view", TPrivilegeLevel.SELECT),
+            onTable("functional", "alltypes_view_sub", TPrivilegeLevel.SELECT))
+        .error(selectError("functional.alltypes_view"))
+        .error(selectError("functional.alltypes_view"), onServer(allExcept(
+            TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
+        .error(selectError("functional.alltypes_view"), onDatabase("functional",
             allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onTable("functional", "alltypes",
-            allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)), onTable("functional",
-            "alltypessmall", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypes"), onColumn("functional", "alltypes",
-            new String[]{"id", "bool_col", "tinyint_col", "smallint_col", "int_col",
-            "bigint_col", "float_col", "double_col", "date_string_col", "string_col",
-            "timestamp_col", "year", "month"}, allExcept(TPrivilegeLevel.ALL,
-            TPrivilegeLevel.SELECT)), onColumn("functional", "alltypessmall",
-            new String[]{"id", "bool_col", "tinyint_col", "smallint_col", "int_col",
-            "bigint_col", "float_col", "double_col", "date_string_col", "string_col",
-            "timestamp_col", "year", "month"}, allExcept(TPrivilegeLevel.ALL,
-            TPrivilegeLevel.SELECT)));
+        .error(selectError("functional.alltypes_view"), onTable("functional",
+            "alltypes_view", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)),
+            onTable("functional", "alltypes_view_sub", TPrivilegeLevel.SELECT))
+        .error(selectError("functional.alltypes_view_sub"), onTable("functional",
+            "alltypes_view_sub", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)),
+            onTable("functional", "alltypes_view", TPrivilegeLevel.SELECT));
+
+    // Union from non-existent databases.
+    authorize("select id from nodb.alltypes union all " +
+        "select id from functional.alltypesagg").error(selectError("nodb.alltypes"));
+
+    // Union from non-existent tables.
+    authorize("select id from functional.notbl union all " +
+        "select id from functional.alltypesagg").error(selectError("functional.notbl"));
   }
 
   @Test
@@ -421,33 +504,39 @@ public class AuthorizationTestV2 extends FrontendTestBase {
         .error(insertError("functional.zipcode_incomes"), onTable("functional",
             "zipcode_incomes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.INSERT)));
 
-    // Insert with select on a target table.
-    authorize("insert into functional.alltypes partition(month, year) " +
-        "select * from functional.alltypestiny where id < 100")
-        .ok(onServer(TPrivilegeLevel.ALL))
-        .ok(onServer(TPrivilegeLevel.INSERT, TPrivilegeLevel.SELECT))
-        .ok(onDatabase("functional", TPrivilegeLevel.ALL))
-        .ok(onDatabase("functional", TPrivilegeLevel.INSERT, TPrivilegeLevel.SELECT))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL),
-            onTable("functional", "alltypestiny", TPrivilegeLevel.ALL))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.INSERT),
-            onTable("functional", "alltypestiny", TPrivilegeLevel.SELECT))
-        .ok(onTable("functional", "alltypes", TPrivilegeLevel.INSERT),
-            onColumn("functional", "alltypestiny", new String[]{"id", "bool_col",
-                "tinyint_col", "smallint_col", "int_col", "bigint_col", "float_col",
-                "double_col", "date_string_col", "string_col", "timestamp_col", "year",
-                "month"}, TPrivilegeLevel.SELECT))
-        .error(selectError("functional.alltypestiny"))
-        .error(selectError("functional.alltypestiny"), onServer(allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.INSERT, TPrivilegeLevel.SELECT)))
-        .error(selectError("functional.alltypestiny"), onDatabase("functional", allExcept(
-            TPrivilegeLevel.ALL, TPrivilegeLevel.INSERT, TPrivilegeLevel.SELECT)))
-        .error(insertError("functional.alltypes"), onTable("functional",
-            "alltypestiny", TPrivilegeLevel.SELECT), onTable("functional",
-            "alltypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.INSERT)))
-        .error(selectError("functional.alltypestiny"), onTable("functional",
-            "alltypestiny", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)),
-            onTable("functional", "alltypes", TPrivilegeLevel.INSERT));
+    for (AuthzTest test : new AuthzTest[]{
+        // With clause with insert.
+        authorize("with t as (select * from functional.alltypestiny) " +
+            "insert into functional.alltypes partition(month, year) " +
+            "select * from t"),
+        // Insert with select on a target table.
+        authorize("insert into functional.alltypes partition(month, year) " +
+            "select * from functional.alltypestiny where id < 100")}) {
+      test.ok(onServer(TPrivilegeLevel.ALL))
+          .ok(onServer(TPrivilegeLevel.INSERT, TPrivilegeLevel.SELECT))
+          .ok(onDatabase("functional", TPrivilegeLevel.ALL))
+          .ok(onDatabase("functional", TPrivilegeLevel.INSERT, TPrivilegeLevel.SELECT))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.ALL),
+              onTable("functional", "alltypestiny", TPrivilegeLevel.ALL))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.INSERT),
+              onTable("functional", "alltypestiny", TPrivilegeLevel.SELECT))
+          .ok(onTable("functional", "alltypes", TPrivilegeLevel.INSERT),
+              onColumn("functional", "alltypestiny", new String[]{"id", "bool_col",
+                  "tinyint_col", "smallint_col", "int_col", "bigint_col", "float_col",
+                  "double_col", "date_string_col", "string_col", "timestamp_col", "year",
+                  "month"}, TPrivilegeLevel.SELECT))
+          .error(selectError("functional.alltypestiny"))
+          .error(selectError("functional.alltypestiny"), onServer(allExcept(
+              TPrivilegeLevel.ALL, TPrivilegeLevel.INSERT, TPrivilegeLevel.SELECT)))
+          .error(selectError("functional.alltypestiny"), onDatabase("functional", allExcept(
+              TPrivilegeLevel.ALL, TPrivilegeLevel.INSERT, TPrivilegeLevel.SELECT)))
+          .error(insertError("functional.alltypes"), onTable("functional",
+              "alltypestiny", TPrivilegeLevel.SELECT), onTable("functional",
+              "alltypes", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.INSERT)))
+          .error(selectError("functional.alltypestiny"), onTable("functional",
+              "alltypestiny", allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT)),
+              onTable("functional", "alltypes", TPrivilegeLevel.INSERT));
+    }
 
     // Insert with select on a target view.
     // Column-level privileges on views are not currently supported.
@@ -626,6 +715,177 @@ public class AuthorizationTestV2 extends FrontendTestBase {
         .error(insertError("functional.alltypes_view"));
   }
 
+  @Test
+  public void testResetMetadata() throws ImpalaException {
+    // Invalidate metadata on server.
+    authorize("invalidate metadata")
+        .ok(onServer(TPrivilegeLevel.ALL))
+        .ok(onServer(TPrivilegeLevel.REFRESH))
+        .error(refreshError("server"));
+
+    // Invalidate metadata/refresh on a table / view
+    for(String name : new String[] {"alltypes", "alltypes_view"}) {
+      for (AuthzTest test : new AuthzTest[]{
+          authorize("invalidate metadata functional." + name),
+          authorize("refresh functional." + name)}) {
+        test.ok(onServer(TPrivilegeLevel.ALL))
+            .ok(onServer(TPrivilegeLevel.REFRESH))
+            .ok(onDatabase("functional", TPrivilegeLevel.ALL))
+            .ok(onDatabase("functional", TPrivilegeLevel.REFRESH))
+            .ok(onTable("functional", name, TPrivilegeLevel.ALL))
+            .ok(onTable("functional", name, TPrivilegeLevel.REFRESH))
+            .error(refreshError("functional." + name))
+            .error(refreshError("functional." + name), onDatabase("functional", allExcept(
+                TPrivilegeLevel.ALL, TPrivilegeLevel.REFRESH)))
+            .error(refreshError("functional." + name), onTable("functional", name,
+                allExcept(TPrivilegeLevel.ALL, TPrivilegeLevel.REFRESH)));
+      }
+    }
+
+    authorize("refresh functions functional")
+        .ok(onServer(TPrivilegeLevel.REFRESH))
+        .ok(onDatabase("functional", TPrivilegeLevel.ALL))
+        .ok(onDatabase("functional", TPrivilegeLevel.REFRESH))
+        .error(refreshError("functional"))
+        .error(refreshError("functional"), onServer(allExcept(TPrivilegeLevel.ALL,
+            TPrivilegeLevel.REFRESH)))
+        .error(refreshError("functional"), onDatabase("functional", allExcept(
+            TPrivilegeLevel.ALL, TPrivilegeLevel.REFRESH)));
+
+    // Reset metadata in non-existent database.
+    authorize("invalidate metadata nodb").error(refreshError("default.nodb"));
+    authorize("refresh nodb").error(refreshError("default.nodb"));
+    authorize("refresh functions nodb").error(refreshError("nodb"));
+  }
+
+  @Test
+  public void testShow() throws ImpalaException {
+    // Show databases should always be allowed.
+    authorize("show databases").ok();
+
+    // Show tables.
+    AuthzTest test = authorize("show tables in functional");
+    for (TPrivilegeLevel privilege : TPrivilegeLevel.values()) {
+      test.ok(onServer(privilege))
+          .ok(onDatabase("functional", privilege))
+          .ok(onTable("functional", "alltypes", privilege));
+    }
+    test.error(accessError("functional.*.*"));
+
+    // Show functions.
+    test = authorize("show functions in functional");
+    for (TPrivilegeLevel privilege : viewMetadataPrivileges()) {
+      test.ok(onServer(privilege))
+          .ok(onDatabase("functional", privilege));
+    }
+    test.error(accessError("functional"));
+
+    // Show tables in system database should always be allowed.
+    authorize("show tables in _impala_builtins").ok();
+
+    // Show tables for non-existent database.
+    authorize("show tables in nodb").error(accessError("nodb"));
+
+    // Show partitions, table stats, and column stats
+    for (AuthzTest authzTest: new AuthzTest[]{
+        authorize("show partitions functional.alltypes"),
+        authorize("show table stats functional.alltypes"),
+        authorize("show column stats functional.alltypes")}) {
+      for (TPrivilegeLevel privilege : viewMetadataPrivileges()) {
+        authzTest.ok(onServer(privilege))
+            .ok(onDatabase("functional", privilege))
+            .ok(onTable("functional", "alltypes", privilege))
+            .error(accessError("functional.alltypes"), onColumn("functional", "alltypes",
+                "id", TPrivilegeLevel.SELECT));
+      }
+      authzTest.error(accessError("functional"));
+    }
+
+    // Show range partitions.dimtbl
+    for (TPrivilegeLevel privilege : viewMetadataPrivileges()) {
+      authorize("show range partitions functional_kudu.dimtbl")
+          .ok(onServer(privilege))
+          .ok(onDatabase("functional_kudu", privilege))
+          .ok(onTable("functional_kudu", "dimtbl", privilege))
+          .error(accessError("functional_kudu.dimtbl"), onColumn("functional_kudu",
+              "dimtbl", "id", TPrivilegeLevel.SELECT))
+          .error(accessError("functional_kudu"));
+    }
+
+    // Show files.
+    for (AuthzTest authzTest : new AuthzTest[]{
+        authorize("show files in functional.alltypes"),
+        authorize("show files in functional.alltypes partition(month=10, year=2010)")}) {
+      for (TPrivilegeLevel privilege : viewMetadataPrivileges()) {
+        authzTest.ok(onServer(privilege))
+            .ok(onDatabase("functional", privilege))
+            .ok(onTable("functional", "alltypes", privilege));
+      }
+      authzTest.error(accessError("functional"));
+    }
+
+    // Show current roles should always be allowed.
+    authorize("show current roles").ok();
+
+    // Show roles should always be allowed.
+    authorize("show roles").ok();
+
+    // Show role grant group should always be allowed.
+    authorize(String.format("show role grant group %s", USER.getName())).ok();
+
+    // Show grant role should always be allowed.
+    authorize(String.format("show grant role authz_test_role")).ok();
+
+    // Show create table.
+    test = authorize("show create table functional.alltypes");
+    for (TPrivilegeLevel privilege : viewMetadataPrivileges()) {
+      test.ok(onServer(privilege))
+          .ok(onDatabase("functional", privilege))
+          .ok(onTable("functional", "alltypes", privilege));
+    }
+    test.error(accessError("functional"));
+    // Show create table on non-existent database.
+    authorize("show create table nodb.alltypes").error(accessError("nodb.alltypes"));
+    // Show create table on non-existent table.
+    authorize("show create table functional.notbl").error(accessError("functional.notbl"));
+
+    // Show create view.
+    test = authorize("show create view functional.alltypes_view");
+    for (TPrivilegeLevel privilege : viewMetadataPrivileges()) {
+      test.ok(onServer(privilege, TPrivilegeLevel.SELECT))
+          .ok(onDatabase("functional", privilege, TPrivilegeLevel.SELECT))
+          .ok(onTable("functional", "alltypes_view", privilege),
+              onTable("functional", "alltypes", TPrivilegeLevel.SELECT));
+    }
+    test.error(accessError("functional"));
+    // Show create view on non-existent database.
+    authorize("show create view nodb.alltypes").error(accessError("nodb.alltypes"));
+    // Show create view on non-existent table.
+    authorize("show create view functional.notbl").error(accessError("functional.notbl"));
+
+    // Show create function.
+    ScalarFunction fn = addFunction("functional", "f");
+    try {
+      test = authorize("show create function functional.f");
+      for (TPrivilegeLevel privilege : viewMetadataPrivileges()) {
+        test.ok(onServer(privilege))
+            .ok(onDatabase("functional", privilege));
+      }
+      test.error(accessError("functional"));
+      // Show create function on non-existent database.
+      authorize("show create function nodb.f").error(accessError("nodb"));
+      // Show create function on non-existent function.
+      authorize("show create function functional.nofn").error(accessError("functional"));
+    } finally {
+      removeFunction(fn);
+    }
+    // Show create function in system database should always be allowed.
+    authorize("show create function _impala_builtins.pi").ok();
+
+    // Show data sourcs should always be allowed.
+    authorize("show data sources").ok();
+  }
+
   private static String selectError(String object) {
     return "User '%s' does not have privileges to execute 'SELECT' on: " + object;
   }
@@ -638,6 +898,28 @@ public class AuthorizationTestV2 extends FrontendTestBase {
     return "User '%s' does not have privileges to access: " + object;
   }
 
+  private static String refreshError(String object) {
+    return "User '%s' does not have privileges to execute " +
+        "'INVALIDATE METADATA/REFRESH' on: " + object;
+  }
+
+  private ScalarFunction addFunction(String db, String fnName) {
+    ScalarFunction fn = ScalarFunction.createForTesting(db, fnName,
+        new ArrayList<Type>(), Type.INT, "/dummy", "dummy.class", null,
+        null, TFunctionBinaryType.NATIVE);
+    authzCatalog_.addFunction(fn);
+    return fn;
+  }
+
+  private void removeFunction(ScalarFunction fn) {
+    authzCatalog_.removeFunction(fn);
+  }
+
+  private TPrivilegeLevel[] viewMetadataPrivileges() {
+    return new TPrivilegeLevel[]{TPrivilegeLevel.ALL, TPrivilegeLevel.SELECT,
+        TPrivilegeLevel.INSERT, TPrivilegeLevel.REFRESH};
+  }
+
   private static TPrivilegeLevel[] allExcept(TPrivilegeLevel... excludedPrivLevels) {
     HashSet<TPrivilegeLevel> excludedSet = Sets.newHashSet(excludedPrivLevels);
     List<TPrivilegeLevel> privLevels = new ArrayList<>();


[4/7] impala git commit: IMPALA-7032: Disable codegen for CHAR type null literals

Posted by ta...@apache.org.
IMPALA-7032: Disable codegen for CHAR type null literals

Analogous to IMPALA-6435, we have to disable codegen for CHAR type null
literals. Otherwise we will crash in
impala::NullLiteral::GetCodegendComputeFn().

This change adds a test to make sure that the crash is fixed.

Change-Id: I34033362263cf1292418f69c5ca1a3b84aed39a9
Reviewed-on: http://gerrit.cloudera.org:8080/10409
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Lars Volker <lv...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a64cfc52
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a64cfc52
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a64cfc52

Branch: refs/heads/master
Commit: a64cfc523ecd5ee7f13c3c7ca63cafe510c4610a
Parents: 13a1acd
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue May 15 11:18:48 2018 -0700
Committer: Lars Volker <lv...@cloudera.com>
Committed: Wed May 16 00:00:15 2018 +0000

----------------------------------------------------------------------
 be/src/exprs/null-literal.cc                          |  4 ++++
 .../queries/QueryTest/disable-codegen.test            | 14 ++++++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a64cfc52/be/src/exprs/null-literal.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/null-literal.cc b/be/src/exprs/null-literal.cc
index d2d4590..8ce3a71 100644
--- a/be/src/exprs/null-literal.cc
+++ b/be/src/exprs/null-literal.cc
@@ -108,6 +108,10 @@ Status NullLiteral::GetCodegendComputeFn(LlvmCodeGen* codegen, llvm::Function**
     return Status::OK();
   }
 
+  if (type_.type == TYPE_CHAR) {
+    return Status::Expected("Codegen not supported for CHAR");
+  }
+
   DCHECK_EQ(GetNumChildren(), 0);
   llvm::Value* args[2];
   *fn = CreateIrFunctionPrototype("NullLiteral", codegen, &args);

http://git-wip-us.apache.org/repos/asf/impala/blob/a64cfc52/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test b/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test
index 9f609d4..600ca45 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test
@@ -32,6 +32,7 @@ row_regex: .*Codegen Disabled: disabled due to optimization hints.*
 # IMPALA-6435: We do not codegen char columns. This fix checks for a
 # CHAR type literal in the expr and disables codegen. This query will crash
 # impala without the fix.
+set disable_codegen_rows_threshold=0;
 select count(*) from (
   select cast('a' as char(4)) as s from functional.alltypestiny
   union all
@@ -44,3 +45,16 @@ select count(*) from (
 ---- TYPES
 bigint
 ====
+---- QUERY
+# IMPALA-7032: Test that codegen gets disabled for CHAR type null literals in
+# the backend. We force codegen to be on in order to exercise the codegen code
+# for null literals.
+set disable_codegen_rows_threshold=0;
+select NULL from functional.alltypestiny
+  union select cast('a' as char(4)) from functional.alltypestiny
+---- RESULTS
+'a   '
+'NULL'
+---- TYPES
+char
+====


[2/7] impala git commit: IMPALA-7022: TestQueries.test_subquery: Subquery must not return more than one row

Posted by ta...@apache.org.
IMPALA-7022: TestQueries.test_subquery: Subquery must not return more than one row

TestQueries.test_subquery sometimes fails during exhaustive
tests.

In the tests we expect to catch an exception that is
prefixed by the "Query aborted:" string. The prefix is
usually added by impala_beeswax.py::wait_for_completion(),
but in rare cases it isn't added.

>From the point of the test it is irrelevant if the exception
is prefixed with "Query aborted:" or not, so I removed it
from the expected exception string.

Change-Id: I3b8655ad273b1dd7a601099f617db609e4a4797b
Reviewed-on: http://gerrit.cloudera.org:8080/10407
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fab65d44
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fab65d44
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fab65d44

Branch: refs/heads/master
Commit: fab65d44792f5a5effcb6f26f08c6ad865c4b64a
Parents: 4fab428
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Tue May 15 15:47:29 2018 +0200
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue May 15 23:37:06 2018 +0000

----------------------------------------------------------------------
 .../functional-query/queries/QueryTest/subquery.test         | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fab65d44/testdata/workloads/functional-query/queries/QueryTest/subquery.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/subquery.test b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
index 2d691ed..df1be9a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/subquery.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
@@ -929,7 +929,7 @@ WHERE int_col =
 ORDER BY id
 ---- RESULTS
 ---- CATCH
-Query aborted:Subquery must not return more than one row:
+Subquery must not return more than one row:
 ====
 ---- QUERY
 # Uncorrelated subquery in arithmetic expr that returns multiple rows
@@ -940,7 +940,7 @@ WHERE int_col =
 ORDER BY id
 ---- RESULTS
 ---- CATCH
-Query aborted:Subquery must not return more than one row:
+Subquery must not return more than one row:
 ====
 ---- QUERY
 # Uncorrelated subquery in binary predicate that returns scalar value at runtime
@@ -1000,7 +1000,7 @@ SELECT a FROM (values(1 a),(2),(3)) v
 WHERE a = (SELECT x FROM (values(1 x),(2),(3)) v)
 ---- RESULTS
 ---- CATCH
-Query aborted:Subquery must not return more than one row:
+Subquery must not return more than one row:
 ====
 ---- QUERY
 # Subquery that returns more than one row
@@ -1009,7 +1009,7 @@ SELECT id FROM functional.alltypes
 WHERE id = (SELECT bigint_col FROM functional.alltypes_view)
 ---- RESULTS
 ---- CATCH
-Query aborted:Subquery must not return more than one row: SELECT bigint_col FROM functional.alltypes_view
+Subquery must not return more than one row: SELECT bigint_col FROM functional.alltypes_view
 ====
 ---- QUERY
 # Runtime scalar subquery with offset.


[6/7] impala git commit: IMPALA-7025: ignore resources in some planner test

Posted by ta...@apache.org.
IMPALA-7025: ignore resources in some planner test

The issue was that the tablesample test verified the mem-estimate
number, which depends on file sizes, which can vary slightly between
data loads.

Instead of trying to tweak the test to avoid the issue, instead provide
a mechanism to ignore the exact values of resources in planner tests
where they are not significant.

Testing:
Manually modified some values in tablesample.test, made sure that the
test still passed. Manually modified the partition count in the
expected output, made sure that the test failed.

Change-Id: I91e3e416ec6242fbf22d9f566fdd1ce225cb16ac
Reviewed-on: http://gerrit.cloudera.org:8080/10410
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/be2d61a7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/be2d61a7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/be2d61a7

Branch: refs/heads/master
Commit: be2d61a7e2e21f5837c946641b094651c4461aa4
Parents: caf275c
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon May 14 17:40:45 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed May 16 02:23:52 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/planner/PlannerTest.java  |  49 +++++----
 .../apache/impala/planner/PlannerTestBase.java  |  69 +++++++++----
 .../org/apache/impala/testutil/TestUtils.java   | 102 +++++++++++--------
 3 files changed, 136 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/be2d61a7/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 8b9166f..2667e74 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -34,6 +34,7 @@ import org.junit.Assume;
 import org.junit.Test;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 // All planner tests, except for S3 specific tests should go here.
@@ -54,9 +55,8 @@ public class PlannerTest extends PlannerTestBase {
     // Tests that constant folding is applied to all relevant PlanNodes and DataSinks.
     // Note that not all Exprs are printed in the explain plan, so validating those
     // via this test is currently not possible.
-    TQueryOptions options = defaultQueryOptions();
-    options.setExplain_level(TExplainLevel.EXTENDED);
-    runPlannerTestFile("constant-folding", options);
+    runPlannerTestFile("constant-folding",
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
   }
 
   @Test
@@ -149,10 +149,9 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testFkPkJoinDetection() {
-    TQueryOptions options = defaultQueryOptions();
     // The FK/PK detection result is included in EXTENDED or higher.
-    options.setExplain_level(TExplainLevel.EXTENDED);
-    runPlannerTestFile("fk-pk-join-detection", options);
+    runPlannerTestFile("fk-pk-join-detection",
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
   }
 
   @Test
@@ -269,7 +268,8 @@ public class PlannerTest extends PlannerTestBase {
   public void testDisableCodegenOptimization() {
     TQueryOptions options = new TQueryOptions();
     options.setDisable_codegen_rows_threshold(3000);
-    runPlannerTestFile("disable-codegen", options, false);
+    runPlannerTestFile("disable-codegen", options,
+        ImmutableSet.of(PlannerTestOption.INCLUDE_EXPLAIN_HEADER));
   }
 
   @Test
@@ -316,9 +316,8 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testParquetFiltering() {
-    TQueryOptions options = defaultQueryOptions();
-    options.setExplain_level(TExplainLevel.EXTENDED);
-    runPlannerTestFile("parquet-filtering", options);
+    runPlannerTestFile("parquet-filtering",
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
   }
 
   @Test
@@ -426,9 +425,11 @@ public class PlannerTest extends PlannerTestBase {
   public void testResourceRequirements() {
     // Tests the resource requirement computation from the planner.
     TQueryOptions options = defaultQueryOptions();
-    options.setExplain_level(TExplainLevel.EXTENDED);
     options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine
-    runPlannerTestFile("resource-requirements", options, false);
+    runPlannerTestFile("resource-requirements", options,
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.INCLUDE_EXPLAIN_HEADER,
+            PlannerTestOption.VALIDATE_RESOURCES));
   }
 
   @Test
@@ -438,7 +439,10 @@ public class PlannerTest extends PlannerTestBase {
     TQueryOptions options = defaultQueryOptions();
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine
-    runPlannerTestFile("spillable-buffer-sizing", options, false);
+    runPlannerTestFile("spillable-buffer-sizing", options,
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+            PlannerTestOption.INCLUDE_EXPLAIN_HEADER,
+            PlannerTestOption.VALIDATE_RESOURCES));
   }
 
   @Test
@@ -449,22 +453,24 @@ public class PlannerTest extends PlannerTestBase {
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine
     options.setMax_row_size(8L * 1024L * 1024L);
-    runPlannerTestFile("max-row-size", options, false);
+    runPlannerTestFile("max-row-size", options,
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+          PlannerTestOption.INCLUDE_EXPLAIN_HEADER,
+          PlannerTestOption.VALIDATE_RESOURCES));
   }
 
   @Test
   public void testSortExprMaterialization() {
     addTestFunction("TestFn", Lists.newArrayList(Type.DOUBLE), false);
-    TQueryOptions options = defaultQueryOptions();
-    options.setExplain_level(TExplainLevel.EXTENDED);
-    runPlannerTestFile("sort-expr-materialization", options);
+    runPlannerTestFile("sort-expr-materialization",
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
   }
 
   @Test
   public void testTableSample() {
     TQueryOptions options = defaultQueryOptions();
-    options.setExplain_level(TExplainLevel.EXTENDED);
-    runPlannerTestFile("tablesample", options);
+    runPlannerTestFile("tablesample", options,
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
   }
 
   @Test
@@ -479,9 +485,8 @@ public class PlannerTest extends PlannerTestBase {
 
   @Test
   public void testPartitionPruning() {
-    TQueryOptions options = defaultQueryOptions();
-    options.setExplain_level(TExplainLevel.EXTENDED);
-    runPlannerTestFile("partition-pruning", options);
+    runPlannerTestFile("partition-pruning",
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/be2d61a7/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 3efbc63..bcd1fc3 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -45,6 +45,7 @@ import org.apache.impala.testutil.TestFileParser;
 import org.apache.impala.testutil.TestFileParser.Section;
 import org.apache.impala.testutil.TestFileParser.TestCase;
 import org.apache.impala.testutil.TestUtils;
+import org.apache.impala.testutil.TestUtils.ResultFilter;
 import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.TDescriptorTable;
 import org.apache.impala.thrift.TExecRequest;
@@ -396,8 +397,8 @@ public class PlannerTestBase extends FrontendTestBase {
    * of 'testCase'.
    */
   private void runTestCase(TestCase testCase, StringBuilder errorLog,
-      StringBuilder actualOutput, String dbName, boolean ignoreExplainHeader)
-      throws CatalogException {
+      StringBuilder actualOutput, String dbName,
+      Set<PlannerTestOption> testOptions) throws CatalogException {
     String query = testCase.getQuery();
     LOG.info("running query " + query);
     if (query.isEmpty()) {
@@ -411,16 +412,16 @@ public class PlannerTestBase extends FrontendTestBase {
     queryCtx.client_request.query_options = testCase.getOptions();
     // Test single node plan, scan range locations, and column lineage.
     TExecRequest singleNodeExecRequest = testPlan(testCase, Section.PLAN, queryCtx.deepCopy(),
-        ignoreExplainHeader, errorLog, actualOutput);
+        testOptions, errorLog, actualOutput);
     validateTableIds(singleNodeExecRequest);
     checkScanRangeLocations(testCase, singleNodeExecRequest, errorLog, actualOutput);
     checkColumnLineage(testCase, singleNodeExecRequest, errorLog, actualOutput);
     checkLimitCardinality(query, singleNodeExecRequest, errorLog);
     // Test distributed plan.
-    testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx.deepCopy(), ignoreExplainHeader,
+    testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx.deepCopy(), testOptions,
         errorLog, actualOutput);
     // test parallel plans
-    testPlan(testCase, Section.PARALLELPLANS, queryCtx.deepCopy(), ignoreExplainHeader,
+    testPlan(testCase, Section.PARALLELPLANS, queryCtx.deepCopy(), testOptions,
         errorLog, actualOutput);
   }
 
@@ -476,11 +477,10 @@ public class PlannerTestBase extends FrontendTestBase {
    * Returns the produced exec request or null if there was an error generating
    * the plan.
    *
-   * If ignoreExplainHeader is true, the explain header with warnings and resource
-   * estimates is stripped out.
+   * testOptions control exactly how the plan is generated and compared.
    */
   private TExecRequest testPlan(TestCase testCase, Section section,
-      TQueryCtx queryCtx, boolean ignoreExplainHeader,
+      TQueryCtx queryCtx, Set<PlannerTestOption> testOptions,
       StringBuilder errorLog, StringBuilder actualOutput) {
     String query = testCase.getQuery();
     queryCtx.client_request.setStmt(query);
@@ -516,7 +516,9 @@ public class PlannerTestBase extends FrontendTestBase {
     if (execRequest == null) return null;
 
     String explainStr = explainBuilder.toString();
-    if (ignoreExplainHeader) explainStr = removeExplainHeader(explainStr);
+    if (!testOptions.contains(PlannerTestOption.INCLUDE_EXPLAIN_HEADER)) {
+      explainStr = removeExplainHeader(explainStr);
+    }
     actualOutput.append(explainStr);
     LOG.info(section.toString() + ":" + explainStr);
     if (expectedErrorMsg != null) {
@@ -524,8 +526,13 @@ public class PlannerTestBase extends FrontendTestBase {
           "\nExpected failure, but query produced %s.\nQuery:\n%s\n\n%s:\n%s",
           section, query, section, explainStr));
     } else {
+      List<ResultFilter> resultFilters =
+          Lists.<ResultFilter>newArrayList(TestUtils.FILE_SIZE_FILTER);
+      if (!testOptions.contains(PlannerTestOption.VALIDATE_RESOURCES)) {
+        resultFilters.addAll(TestUtils.RESOURCE_FILTERS);
+      }
       String planDiff = TestUtils.compareOutput(
-          Lists.newArrayList(explainStr.split("\n")), expectedPlan, true, true);
+          Lists.newArrayList(explainStr.split("\n")), expectedPlan, true, resultFilters);
       if (!planDiff.isEmpty()) {
         errorLog.append(String.format(
             "\nSection %s of query:\n%s\n\n%s", section, query, planDiff));
@@ -587,7 +594,8 @@ public class PlannerTestBase extends FrontendTestBase {
     if (expectedLocations.size() > 0 && locationsStr != null) {
       // Locations' order does not matter.
       String result = TestUtils.compareOutput(
-          Lists.newArrayList(locationsStr.split("\n")), expectedLocations, false, false);
+          Lists.newArrayList(locationsStr.split("\n")), expectedLocations, false,
+          Collections.<TestUtils.ResultFilter>emptyList());
       if (!result.isEmpty()) {
         errorLog.append("section " + Section.SCANRANGELOCATIONS + " of query:\n"
             + query + "\n" + result);
@@ -749,23 +757,46 @@ public class PlannerTestBase extends FrontendTestBase {
     return explain;
   }
 
+  /**
+   * Assorted binary options that alter the behaviour of planner tests, generally
+   * enabling additional more-detailed checks.
+   */
+  protected static enum PlannerTestOption {
+    // Generate extended explain plans (default is STANDARD).
+    EXTENDED_EXPLAIN,
+    // Include the header of the explain plan (default is to strip the explain header).
+    INCLUDE_EXPLAIN_HEADER,
+    // Validate the values of resource requirements (default is to ignore differences
+    // in resource values).
+    VALIDATE_RESOURCES,
+  }
+
   protected void runPlannerTestFile(String testFile, TQueryOptions options) {
-    runPlannerTestFile(testFile, options, true);
+    runPlannerTestFile(testFile, "default", options,
+        Collections.<PlannerTestOption>emptySet());
   }
 
   protected void runPlannerTestFile(String testFile, TQueryOptions options,
-      boolean ignoreExplainHeader) {
-    runPlannerTestFile(testFile, "default", options, ignoreExplainHeader);
+      Set<PlannerTestOption> testOptions) {
+    runPlannerTestFile(testFile, "default", options, testOptions);
+  }
+
+  protected void runPlannerTestFile(
+      String testFile, Set<PlannerTestOption> testOptions) {
+    runPlannerTestFile(testFile, "default", defaultQueryOptions(), testOptions);
   }
 
   private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options,
-      boolean ignoreExplainHeader) {
+        Set<PlannerTestOption> testOptions) {
     String fileName = testDir_.resolve(testFile + ".test").toString();
     if (options == null) {
       options = defaultQueryOptions();
     } else {
       options = mergeQueryOptions(defaultQueryOptions(), options);
     }
+    if (testOptions.contains(PlannerTestOption.EXTENDED_EXPLAIN)) {
+      options.setExplain_level(TExplainLevel.EXTENDED);
+    }
     TestFileParser queryFileParser = new TestFileParser(fileName, options);
     StringBuilder actualOutput = new StringBuilder();
 
@@ -782,7 +813,7 @@ public class PlannerTestBase extends FrontendTestBase {
         actualOutput.append("\n");
       }
       try {
-        runTestCase(testCase, errorLog, actualOutput, dbName, ignoreExplainHeader);
+        runTestCase(testCase, errorLog, actualOutput, dbName, testOptions);
       } catch (CatalogException e) {
         errorLog.append(String.format("Failed to plan query\n%s\n%s",
             testCase.getQuery(), e.getMessage()));
@@ -808,10 +839,12 @@ public class PlannerTestBase extends FrontendTestBase {
   }
 
   protected void runPlannerTestFile(String testFile) {
-    runPlannerTestFile(testFile, "default", null, true);
+    runPlannerTestFile(testFile, "default", defaultQueryOptions(),
+        Collections.<PlannerTestOption>emptySet());
   }
 
   protected void runPlannerTestFile(String testFile, String dbName) {
-    runPlannerTestFile(testFile, dbName, null, true);
+    runPlannerTestFile(testFile, dbName, defaultQueryOptions(),
+        Collections.<PlannerTestOption>emptySet());
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/be2d61a7/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
index 629c44b..cbc3d1b 100644
--- a/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
+++ b/fe/src/test/java/org/apache/impala/testutil/TestUtils.java
@@ -20,9 +20,11 @@ import java.io.StringReader;
 import java.io.StringWriter;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
 import java.util.TimeZone;
@@ -56,7 +58,12 @@ public class TestUtils {
   // than a literal
   private final static String REGEX_AGAINST_ACTUAL = "regex:";
 
-  interface ResultFilter {
+  // Regexes that match various elements in plan.
+  private final static String NUMBER_REGEX = "\\d+(\\.\\d+)?";
+  private final static String BYTE_SUFFIX_REGEX = "[KMGT]?B";
+  private final static String BYTE_VALUE_REGEX = NUMBER_REGEX + BYTE_SUFFIX_REGEX;
+
+  public interface ResultFilter {
     public boolean matches(String input);
     public String transform(String input);
   }
@@ -82,41 +89,54 @@ public class TestUtils {
     }
   }
 
-  static PathFilter[] pathFilterList_ = {
-    new PathFilter("hdfs:"),
-    new PathFilter("file: ")
-  };
 
-  // File size could vary from run to run. For example, the parquet file header size
-  // or column metadata size could change if the Impala version changes. That doesn't
-  // mean anything is wrong with the plan, so we want to filter the file size out.
-  static class FileSizeFilter implements ResultFilter {
-    private final static String BYTE_FILTER = "[KMGT]?B";
-    private final static String NUMBER_FILTER = "\\d+(\\.\\d+)?";
-    private final static String FILTER_KEY = " size=";
+  /**
+   * Filter to ignore the value from elements in the format key=value.
+   */
+  public static class IgnoreValueFilter implements ResultFilter {
+    // Literal string containing the key name.
+    private final String keyPrefix;
+    private final String valueRegex;
+
+    /**
+     * Create a filter that ignores the value from key value pairs where the key is
+     * the literal 'key' value and the value matches 'valueRegex'.
+     */
+    public IgnoreValueFilter(String key, String valueRegex) {
+      // Include leading space to avoid matching partial keys, e.g. if key is "bar" we
+      // don't want to match "foobar=".
+      this.keyPrefix = " " + key + "=";
+      this.valueRegex = valueRegex;
+    }
 
-    public boolean matches(String input) { return input.contains(FILTER_KEY); }
+    public boolean matches(String input) { return input.contains(keyPrefix); }
 
     public String transform(String input) {
-      return input.replaceAll(FILTER_KEY + NUMBER_FILTER + BYTE_FILTER, FILTER_KEY);
+      return input.replaceAll(keyPrefix + valueRegex, keyPrefix);
     }
   }
-
-  static FileSizeFilter fileSizeFilter_ = new FileSizeFilter();
+  // File size could vary from run to run. For example, the parquet file header size
+  // or column metadata size could change if the Impala version changes. That doesn't
+  // mean anything is wrong with the plan, so we want to filter the file size out.
+  public static final IgnoreValueFilter FILE_SIZE_FILTER =
+      new IgnoreValueFilter("size", BYTE_VALUE_REGEX);
 
   // Ignore the exact estimated row count, which depends on the file sizes.
-  static class ScanRangeRowCountFilter implements ResultFilter {
-    private final static String NUMBER_FILTER = "\\d+(\\.\\d+)?";
-    private final static String FILTER_KEY = " max-scan-range-rows=";
-
-    public boolean matches(String input) { return input.contains(FILTER_KEY); }
+  static IgnoreValueFilter SCAN_RANGE_ROW_COUNT_FILTER =
+      new IgnoreValueFilter("max-scan-range-rows", NUMBER_REGEX);
 
-    public String transform(String input) {
-      return input.replaceAll(FILTER_KEY + NUMBER_FILTER, FILTER_KEY);
-    }
-  }
+  // Filters that are always applied
+  private static final List<ResultFilter> DEFAULT_FILTERS = Arrays.<ResultFilter>asList(
+    SCAN_RANGE_ROW_COUNT_FILTER, new PathFilter("hdfs:"), new PathFilter("file: "));
 
-  static ScanRangeRowCountFilter scanRangeRowCountFilter_ = new ScanRangeRowCountFilter();
+  // Filters that ignore the values of resource requirements that appear in
+  // "EXTENDED" and above explain plans.
+  public static final List<ResultFilter> RESOURCE_FILTERS = Arrays.<ResultFilter>asList(
+      new IgnoreValueFilter("mem-estimate", BYTE_VALUE_REGEX),
+      new IgnoreValueFilter("mem-reservation", BYTE_VALUE_REGEX),
+      new IgnoreValueFilter("thread-reservation", NUMBER_REGEX),
+      new IgnoreValueFilter("Memory", BYTE_VALUE_REGEX),
+      new IgnoreValueFilter("Threads", NUMBER_REGEX));
 
   /**
    * Do a line-by-line comparison of actual and expected output.
@@ -126,12 +146,14 @@ public class TestUtils {
    * the expected line (ignoring the expectedFilePrefix prefix).
    * If orderMatters is false, we consider actual to match expected if they
    * both contains the same output lines regardless of order.
+   * lineFilters is a list of filters that are applied to corresponding lines in the
+   * actual and expected output if the filter matches the expected output.
    *
    * @return an error message if actual does not match expected, "" otherwise.
    */
   public static String compareOutput(
       ArrayList<String> actual, ArrayList<String> expected, boolean orderMatters,
-      boolean filterFileSize) {
+      List<ResultFilter> lineFilters) {
     if (!orderMatters) {
       Collections.sort(actual);
       Collections.sort(expected);
@@ -141,26 +163,18 @@ public class TestUtils {
     for (int i = 0; i < maxLen; ++i) {
       String expectedStr = expected.get(i).trim();
       String actualStr = actual.get(i);
-      // Filter out contents that change run to run but don't affect compare result.
+      // Apply all default and caller-supplied filters to the expected and actual output.
       boolean containsPrefix = false;
-      for (PathFilter filter: pathFilterList_) {
-        if (filter.matches(expectedStr)) {
-          containsPrefix = true;
-          expectedStr = filter.transform(expectedStr);
-          actualStr = filter.transform(actualStr);
-          break;
+      for (List<ResultFilter> filters:
+          Arrays.<List<ResultFilter>>asList(DEFAULT_FILTERS, lineFilters)) {
+        for (ResultFilter filter: filters) {
+          if (filter.matches(expectedStr)) {
+            containsPrefix = true;
+            expectedStr = filter.transform(expectedStr);
+            actualStr = filter.transform(actualStr);
+          }
         }
       }
-      if (filterFileSize && fileSizeFilter_.matches(expectedStr)) {
-        containsPrefix = true;
-        expectedStr = fileSizeFilter_.transform(expectedStr);
-        actualStr = fileSizeFilter_.transform(actualStr);
-      }
-      if (scanRangeRowCountFilter_.matches(expectedStr)) {
-        containsPrefix = true;
-        expectedStr = scanRangeRowCountFilter_.transform(expectedStr);
-        actualStr = scanRangeRowCountFilter_.transform(actualStr);
-      }
 
       boolean ignoreAfter = false;
       for (int j = 0; j < ignoreContentAfter_.length; ++j) {


[5/7] impala git commit: IMPALA-6997: Avoid redundant dumping in SetMemLimitExceeded()

Posted by ta...@apache.org.
IMPALA-6997: Avoid redundant dumping in SetMemLimitExceeded()

When a UDF hits a MemLimitExceeded, the query does not
immediately abort. Instead, UDFs rely on the caller
checking the query_status_ periodically. This means that
on some codepaths, UDFs can call SetMemLimitExceeded()
many times (e.g. once per row) before the query fragment
exits.

RuntimeState::SetMemLimitExceeded() currently constructs
a MemLimitExceeded Status and dumps it for each call, even
if the query has already hit an error. This is expensive
and can delay an fragment from exiting when UDFs are
repeatedly hitting MemLimitExceeded.

This changes SetMemLimitExceeded() to avoid dumping if
the query_status_ is already not ok.

Change-Id: I92b87f370a68a2f695ebbc2520a98dd143730701
Reviewed-on: http://gerrit.cloudera.org:8080/10364
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/caf275c1
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/caf275c1
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/caf275c1

Branch: refs/heads/master
Commit: caf275c11a62c33d0211e71f3285c4977dd6799d
Parents: a64cfc5
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Wed May 9 15:05:37 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed May 16 02:22:24 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/runtime-state.cc | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/caf275c1/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 15b3354..041c19b 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -206,6 +206,16 @@ Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
 
 void RuntimeState::SetMemLimitExceeded(MemTracker* tracker,
     int64_t failed_allocation_size, const ErrorMsg* msg) {
+  // Constructing the MemLimitExceeded and logging it is not cheap, so
+  // avoid the cost if the query has already hit an error.
+  // This is particularly important on the UDF codepath, because the UDF codepath
+  // cannot abort the fragment immediately. It relies on callers checking status
+  // periodically. This means that this function could be called a large number of times
+  // (e.g. once per row) before the fragment aborts. See IMPALA-6997.
+  {
+    lock_guard<SpinLock> l(query_status_lock_);
+    if (!query_status_.ok()) return;
+  }
   Status status = tracker->MemLimitExceeded(this, msg == nullptr ? "" : msg->msg(),
       failed_allocation_size);
   {


[3/7] impala git commit: IMPALA-7003: Deflake erasure coding data loading

Posted by ta...@apache.org.
IMPALA-7003: Deflake erasure coding data loading

Erasure coding data loading is flaky in two ways:
1. HBase sometimes doesn't work because of HBase-19369
2. Nested data loading sometimes fails because the HDFS namenode cannot
   find enough good datanodes.

For problem 1, this patch enables erasure coding only on /test-warehouse
directory. For problem 2, this patch sets
dfs.namenode.redundancy.considerLoad to false, preventing namenode from
excluding heavily-loaded datanodes.

Change-Id: I219106cd3ec7ffab7a834700f2a722b165e5f66c
Reviewed-on: http://gerrit.cloudera.org:8080/10362
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/13a1acd7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/13a1acd7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/13a1acd7

Branch: refs/heads/master
Commit: 13a1acd7e42b533c39b9f4eea1c17823bde4c1c5
Parents: fab65d4
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Wed May 9 14:53:07 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue May 15 23:59:58 2018 +0000

----------------------------------------------------------------------
 bin/impala-config.sh                            |  2 +-
 testdata/bin/create-load-data.sh                | 37 ++++++++++++++------
 testdata/bin/load-test-warehouse-snapshot.sh    |  5 +++
 testdata/bin/setup-hdfs-env.sh                  |  2 +-
 .../common/etc/hadoop/conf/hdfs-site.xml.tmpl   | 10 ++++++
 5 files changed, 44 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/13a1acd7/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 97bec30..8e58332 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -452,7 +452,7 @@ elif [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
       return 1
     fi
     export HDFS_ERASURECODE_POLICY="RS-3-2-1024k"
-    export HDFS_ERASURECODE_PATH="/"
+    export HDFS_ERASURECODE_PATH="/test-warehouse"
   fi
 else
   echo "Unsupported filesystem '$TARGET_FILESYSTEM'"

http://git-wip-us.apache.org/repos/asf/impala/blob/13a1acd7/testdata/bin/create-load-data.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 280105d..e5b0f87 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -96,14 +96,6 @@ do
   shift;
 done
 
-# The hdfs environment script sets up kms (encryption) and cache pools (hdfs caching).
-# On a non-hdfs filesystem, we don't test encryption or hdfs caching, so this setup is not
-# needed.
-if [[ "${TARGET_FILESYSTEM}" == "hdfs" ]]; then
-  run-step "Setting up HDFS environment" setup-hdfs-env.log \
-      ${IMPALA_HOME}/testdata/bin/setup-hdfs-env.sh
-fi
-
 if [[ $SKIP_METADATA_LOAD -eq 0  && "$SNAPSHOT_FILE" = "" ]]; then
   if [[ -z "$REMOTE_LOAD" ]]; then
     run-step "Loading Hive Builtins" load-hive-builtins.log \
@@ -291,6 +283,14 @@ function copy-and-load-dependent-tables {
     /tmp/alltypes_rc /tmp/alltypes_seq
   hadoop fs -mkdir -p /tmp/alltypes_seq/year=2009 \
     /tmp/alltypes_rc/year=2009
+
+  # The file written by hive to /test-warehouse will be strangely replicated rather than
+  # erasure coded if EC is not set in /tmp
+  if [[ -n "${HDFS_ERASURECODE_POLICY:-}" ]]; then
+    hdfs ec -setPolicy -policy "${HDFS_ERASURECODE_POLICY}" -path "/tmp/alltypes_rc"
+    hdfs ec -setPolicy -policy "${HDFS_ERASURECODE_POLICY}" -path "/tmp/alltypes_seq"
+  fi
+
   hadoop fs -cp /test-warehouse/alltypes_seq/year=2009/month=2/ /tmp/alltypes_seq/year=2009
   hadoop fs -cp /test-warehouse/alltypes_rc/year=2009/month=3/ /tmp/alltypes_rc/year=2009
 
@@ -468,7 +468,16 @@ function copy-and-load-ext-data-source {
     ${IMPALA_HOME}/testdata/bin/create-data-source-table.sql
 }
 
-function wait-hdfs-replication {
+function check-hdfs-health {
+  if [[ -n "${HDFS_ERASURECODE_POLICY:-}" ]]; then
+    if ! grep "Replicated Blocks:[[:space:]]*#[[:space:]]*Total size:[[:space:]]*0 B"\
+        <<< $(hdfs fsck /test-warehouse | tr '\n' '#'); then
+        echo "There are some replicated files despite that erasure coding is on"
+        echo "Failing the data loading job"
+        exit 1
+    fi
+    return
+  fi
   MAX_FSCK=30
   SLEEP_SEC=120
   LAST_NUMBER_UNDER_REPLICATED=-1
@@ -518,6 +527,14 @@ if [[ -z "$REMOTE_LOAD" ]]; then
     ${START_CLUSTER_ARGS}
 fi
 
+# The hdfs environment script sets up kms (encryption) and cache pools (hdfs caching).
+# On a non-hdfs filesystem, we don't test encryption or hdfs caching, so this setup is not
+# needed.
+if [[ "${TARGET_FILESYSTEM}" == "hdfs" ]]; then
+  run-step "Setting up HDFS environment" setup-hdfs-env.log \
+      ${IMPALA_HOME}/testdata/bin/setup-hdfs-env.sh
+fi
+
 if [ $SKIP_METADATA_LOAD -eq 0 ]; then
   run-step "Loading custom schemas" load-custom-schemas.log load-custom-schemas
   # Run some steps in parallel, with run-step-backgroundable / run-step-wait-all.
@@ -580,7 +597,7 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
   run-step "Creating internal HBase table" create-internal-hbase-table.log \
       create-internal-hbase-table
 
-  run-step "Waiting for HDFS replication" wait-hdfs-replication.log wait-hdfs-replication
+  run-step "Checking HDFS health" check-hdfs-health.log check-hdfs-health
 fi
 
 # TODO: Investigate why all stats are not preserved. Theoretically, we only need to

http://git-wip-us.apache.org/repos/asf/impala/blob/13a1acd7/testdata/bin/load-test-warehouse-snapshot.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/load-test-warehouse-snapshot.sh b/testdata/bin/load-test-warehouse-snapshot.sh
index 311a9ea..fe5dd2a 100755
--- a/testdata/bin/load-test-warehouse-snapshot.sh
+++ b/testdata/bin/load-test-warehouse-snapshot.sh
@@ -72,6 +72,11 @@ if [[ "$REPLY" =~ ^[Yy]$ ]]; then
     fi
     echo "Creating ${TEST_WAREHOUSE_DIR} directory"
     hadoop fs -mkdir -p ${FILESYSTEM_PREFIX}${TEST_WAREHOUSE_DIR}
+    if [[ -n "${HDFS_ERASURECODE_POLICY:-}" ]]; then
+      hdfs ec -enablePolicy -policy "${HDFS_ERASURECODE_POLICY}"
+      hdfs ec -setPolicy -policy "${HDFS_ERASURECODE_POLICY}" \
+        -path "${HDFS_ERASURECODE_PATH:=/test-warehouse}"
+    fi
 
     # TODO: commented out because of regressions in local end-to-end testing. See
     # IMPALA-4345

http://git-wip-us.apache.org/repos/asf/impala/blob/13a1acd7/testdata/bin/setup-hdfs-env.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/setup-hdfs-env.sh b/testdata/bin/setup-hdfs-env.sh
index a07a9dd..552c48b 100755
--- a/testdata/bin/setup-hdfs-env.sh
+++ b/testdata/bin/setup-hdfs-env.sh
@@ -76,5 +76,5 @@ fi
 if [[ -n "${HDFS_ERASURECODE_POLICY:-}" ]]; then
   hdfs ec -enablePolicy -policy "${HDFS_ERASURECODE_POLICY}"
   hdfs ec -setPolicy -policy "${HDFS_ERASURECODE_POLICY}" \
-    -path "${HDFS_ERASURECODE_PATH:=/}"
+    -path "${HDFS_ERASURECODE_PATH:=/test-warehouse}"
 fi

http://git-wip-us.apache.org/repos/asf/impala/blob/13a1acd7/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
----------------------------------------------------------------------
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
index 6882fa3..717ae7c 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
@@ -114,12 +114,22 @@
     <value>true</value>
   </property>
 
+  <!-- The default behavior of the namenode is to exclude datanodes with the number of
+    connections 2x higher than the average among all the datanodes from being considered
+    for replication/EC. In the minicluster we have to use every datanode for every block
+    so this should be disabled. -->
+  <property>
+    <name>dfs.namenode.redundancy.considerLoad</name>
+    <value>false</value>
+  </property>
+
   <!-- Location of the KMS key provider -->
   <property>
     <name>dfs.encryption.key.provider.uri</name>
     <value>kms://http@127.0.0.1:9600/kms</value>
   </property>
 
+
   <!-- BEGIN Kerberos settings -->
   <!-- We use the MiniKdc; it generates a keytab and krb5.conf; we point
        everyone at that one keytab and go to town... -->