You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/10/28 20:02:35 UTC

[1/5] incubator-impala git commit: IMPALA-3788: Support for Kudu 'read-your-writes' consistency

Repository: incubator-impala
Updated Branches:
  refs/heads/master c24e9da91 -> 3b8a0891d


IMPALA-3788: Support for Kudu 'read-your-writes' consistency

Kudu provides an API to get/set a 'latest observed
timestamp' on clients to allow a client which inserts to
capture and send this timestamp to another client before a
read to ensure that data as of that timestamp is visible.
This adds support for this feature _for reads within a
session_ by capturing the latest observed timestamp when the
KuduTableSink is sending its last update to the coordinator.
The timestamp is sent with other post-write information, and
is aggregated (i.e. taking the max) at the coordinator. The
max is stored in the session, and that value is then set in
the Kudu client on future scans.

This is being tested by running the Kudu tests after
removing delays that were introduced to work around the
issue that reads might not be visible after a write. Before
this change, if there were no delay, inconsistent results
could be returned.

Change-Id: I6bcb5fc218ad4ab935343a55b2188441d8c7dfbd
Reviewed-on: http://gerrit.cloudera.org:8080/4779
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c01644bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c01644bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c01644bc

Branch: refs/heads/master
Commit: c01644bcb9746c440cc6fd425a564ec40ea6d27c
Parents: c24e9da
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Thu Oct 20 15:21:53 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Oct 26 21:11:06 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scan-node.cc              |  5 +++++
 be/src/exec/kudu-table-sink.cc             |  3 +++
 be/src/runtime/coordinator.cc              | 11 +++++++++++
 be/src/runtime/coordinator.h               |  5 +++++
 be/src/service/impala-hs2-server.cc        |  1 +
 be/src/service/impala-server.cc            |  3 +++
 be/src/service/impala-server.h             |  4 ++++
 be/src/service/query-exec-state.cc         |  9 +++++++++
 common/thrift/ImpalaInternalService.thrift |  7 +++++++
 tests/common/impala_test_suite.py          |  4 +---
 tests/query_test/test_kudu.py              |  6 ++----
 11 files changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 4a162a8..1aef102 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -128,6 +128,11 @@ Status KuduScanNode::Open(RuntimeState* state) {
 
   KUDU_RETURN_IF_ERROR(b.Build(&client_), "Unable to create Kudu client");
 
+  uint64_t latest_ts = static_cast<uint64_t>(
+      max<int64_t>(0, state->query_ctx().session.kudu_latest_observed_ts));
+  VLOG_RPC << "Latest observed Kudu timestamp: " << latest_ts;
+  if (latest_ts > 0) client_->SetLatestObservedTimestamp(latest_ts);
+
   KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_),
       "Unable to open Kudu table");
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 70a74a9..a9beb29 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -278,6 +278,7 @@ Status KuduTableSink::CheckForErrors(RuntimeState* state) {
 
   // Get the pending errors from the Kudu session. If errors overflowed the error buffer
   // we can't be sure all errors can be ignored, so an error status will be reported.
+  // TODO: Make sure Kudu handles conflict errors properly if IGNORE is set (KUDU-1563).
   bool error_overflow = false;
   session_->GetPendingErrors(&errors, &error_overflow);
   if (UNLIKELY(error_overflow)) {
@@ -327,6 +328,8 @@ Status KuduTableSink::FlushFinal(RuntimeState* state) {
   Status status = CheckForErrors(state);
   (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_num_modified_rows(
       rows_written_->value() - kudu_error_counter_->value());
+  (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_kudu_latest_observed_ts(
+      client_->GetLatestObservedTimestamp());
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 0f41deb..f991e1c 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1670,6 +1670,8 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
       TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
       status->__set_num_modified_rows(
           status->num_modified_rows + partition.second.num_modified_rows);
+      status->__set_kudu_latest_observed_ts(std::max(
+          partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts));
       status->__set_id(partition.second.id);
       status->__set_partition_base_dir(partition.second.partition_base_dir);
 
@@ -1736,6 +1738,15 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
   return Status::OK();
 }
 
+uint64_t Coordinator::GetLatestKuduInsertTimestamp() const {
+  uint64_t max_ts = 0;
+  for (const auto& entry : per_partition_status_) {
+    max_ts = std::max(max_ts,
+        static_cast<uint64_t>(entry.second.kudu_latest_observed_ts));
+  }
+  return max_ts;
+}
+
 RuntimeState* Coordinator::runtime_state() {
   return executor_ == NULL ? NULL : executor_->runtime_state();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 9904def..066d532 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -177,6 +177,11 @@ class Coordinator {
   /// This is safe to call only after Wait()
   const PartitionStatusMap& per_partition_status() { return per_partition_status_; }
 
+  /// Returns the latest Kudu timestamp observed across any backends where DML into Kudu
+  /// was executed, or 0 if there were no Kudu timestamps reported.
+  /// This should only be called after Wait().
+  uint64_t GetLatestKuduInsertTimestamp() const;
+
   /// Gathers all updates to the catalog required once this query has completed execution.
   /// Returns true if a catalog update is required, false otherwise.
   /// Must only be called after Wait()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 488a1ee..05a58a4 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -286,6 +286,7 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
   state->network_address = ThriftServer::GetThreadConnectionContext()->network_address;
   state->last_accessed_ms = UnixMillis();
   state->hs2_version = min(MAX_SUPPORTED_HS2_VERSION, request.client_protocol);
+  state->kudu_latest_observed_ts = 0;
 
   // If the username was set by a lower-level transport, use it.
   const ThriftServer::Username& username =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index bf83eec..4886bdf 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1159,6 +1159,7 @@ void ImpalaServer::SessionState::ToThrift(const TUniqueId& session_id,
   // proxy user is authorized to delegate as this user.
   if (!do_as_user.empty()) state->__set_delegated_user(do_as_user);
   state->network_address = network_address;
+  state->__set_kudu_latest_observed_ts(kudu_latest_observed_ts);
 }
 
 void ImpalaServer::CancelFromThreadPool(uint32_t thread_id,
@@ -1625,6 +1626,8 @@ void ImpalaServer::ConnectionStart(
     session_state->session_type = TSessionType::BEESWAX;
     session_state->network_address = connection_context.network_address;
     session_state->default_query_options = default_query_options_;
+    session_state->kudu_latest_observed_ts = 0;
+
     // If the username was set by a lower-level transport, use it.
     if (!connection_context.username.empty()) {
       session_state->connected_user = connection_context.username;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 53f3384..7d0f7c9 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -708,6 +708,10 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
     /// Time the session was last accessed.
     int64_t last_accessed_ms;
 
+    /// The latest Kudu timestamp observed after DML operations executed within this
+    /// session.
+    uint64_t kudu_latest_observed_ts;
+
     /// Number of RPCs concurrently accessing this session state. Used to detect when a
     /// session may be correctly expired after a timeout (when ref_count == 0). Typically
     /// at most one RPC will be issued against a session at a time, but clients may do

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 3dc9d6c..7c40d37 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -537,6 +537,15 @@ void ImpalaServer::QueryExecState::Done() {
   query_events_->MarkEvent("Unregister query");
 
   if (coord_.get() != NULL) {
+    // Update latest observed Kudu timestamp stored in the session.
+    uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
+    if (latest_kudu_ts > 0) {
+      VLOG_RPC << "Updating session latest observed Kudu timestamp: " << latest_kudu_ts;
+      lock_guard<mutex> session_lock(session_->lock);
+      session_->kudu_latest_observed_ts = std::max<uint64_t>(
+          session_->kudu_latest_observed_ts, latest_kudu_ts);
+    }
+
     // Release any reserved resources.
     Status status = exec_env_->scheduler()->Release(schedule_.get());
     if (!status.ok()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index e9c3119..15c0020 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -240,6 +240,9 @@ struct TSessionState {
 
   // Client network address
   4: required Types.TNetworkAddress network_address
+
+  // If set, the latest Kudu timestamp observed within this session.
+  7: optional i64 kudu_latest_observed_ts;
 }
 
 // Client request including stmt to execute and query options.
@@ -430,6 +433,10 @@ struct TInsertPartitionStatus {
 
   // Fully qualified URI to the base directory for this partition.
   4: required string partition_base_dir
+
+  // The latest observed Kudu timestamp reported by the KuduSession at this partition.
+  // This value is an unsigned int64.
+  5: optional i64 kudu_latest_observed_ts
 }
 
 // The results of an INSERT query, sent to the coordinator as part of

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 2c0f3c6..df88148 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -235,7 +235,7 @@ class ImpalaTestSuite(BaseTestSuite):
 
 
   def run_test_case(self, test_file_name, vector, use_db=None, multiple_impalad=False,
-      encoding=None, wait_secs_between_stmts=None):
+      encoding=None):
     """
     Runs the queries in the specified test based on the vector values
 
@@ -318,8 +318,6 @@ class ImpalaTestSuite(BaseTestSuite):
           if set_pattern_match != None:
             query_options_changed.append(set_pattern_match.groups()[0])
           result = self.__execute_query(target_impalad_client, query, user=user)
-          if wait_secs_between_stmts:
-            time.sleep(wait_secs_between_stmts)
       except Exception as e:
         if 'CATCH' in test_section:
           self.__verify_exceptions(test_section['CATCH'], str(e), use_db)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c01644bc/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index c22de3e..7ccb8a2 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -39,12 +39,10 @@ class TestKuduOperations(KuduTestSuite):
   """
 
   def test_kudu_scan_node(self, vector, unique_database):
-    self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database,
-        wait_secs_between_stmts=1)
+    self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database)
 
   def test_kudu_crud(self, vector, unique_database):
-    self.run_test_case('QueryTest/kudu_crud', vector, use_db=unique_database,
-        wait_secs_between_stmts=1)
+    self.run_test_case('QueryTest/kudu_crud', vector, use_db=unique_database)
 
   def test_kudu_partition_ddl(self, vector, unique_database):
     self.run_test_case('QueryTest/kudu_partition_ddl', vector, use_db=unique_database)


[5/5] incubator-impala git commit: IMPALA-4348 / IMPALA-4333: Improve coordinator fragment cancellation

Posted by he...@apache.org.
IMPALA-4348 / IMPALA-4333: Improve coordinator fragment cancellation

This patch fixes two problems:

1. If a fragment instance is cancelled (by the coordinator) concurrently
to the coordinator calling GetNext() on its PlanRootSink, GetNext() may
access a destroyed PlanRootSink object and crash (IMPALA-4333).

2. If ExecPlanFragment() times out, but is successful, the coordinator
will never call CloseConsumer() to release its side of the PlanRootSink,
and the fragment instance will never finish (IMPALA-4348).

The following changes address this:

* Make PlanFragmentExecutor::Cancel() call CloseConsumer() on its root
  sink. This ensures that, no matter what the reason, the fragment
  instance will eventually terminate. Otherwise the coordinator may not
  call CloseConsumer() if it could not access the root instance's
  FragmentExecState.

* Ensure that the PlanRootSink has a lifetime at least as long as the
  coordinator object by taking a shared_ptr reference to its parent
  FragmentExecState object. Even if the fragment instance finishes, the
  object will still be valid until the coordinator releases this
  reference. In the future, we expect this to be replaced by an explicit
  reference take / relinquish API.

* Ensure the coordinator tries to cancel fragment instances if any
  attempt was made to start them, not just if the RPC was a
  success. This deals with the timeout case where the RPC was slow, but
  successful.

This patch relies on a fix for IMPALA-4383, which ensures that fragment
instances will always try to send reports, and so will always detect an
error and cancel themselves.

Testing:

* Ran TestRPCTimeout.test_execplanfragment_timeout in a loop for 90
minutes. Patch addresses hangs and failure modes that were observed
previously.

* Added a manual sleep just before Coordinator::GetNext() calls
  root_sink_->GetNext(), and cancelled a query manually in that
  window. Confirmed that query cancels successfully.

Change-Id: I5b4e25c1d658b3929182ba5e56b5c5e881dd394a
Reviewed-on: http://gerrit.cloudera.org:8080/4865
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3b8a0891
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3b8a0891
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3b8a0891

Branch: refs/heads/master
Commit: 3b8a0891d9eebec00fc2654bb4a705752edc6759
Parents: d82411f
Author: Henry Robinson <he...@cloudera.com>
Authored: Wed Oct 26 03:29:28 2016 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Fri Oct 28 19:40:58 2016 +0000

----------------------------------------------------------------------
 be/src/exec/plan-root-sink.cc            |  5 +--
 be/src/exec/plan-root-sink.h             | 25 +++++++-----
 be/src/runtime/coordinator.cc            | 57 +++++++++++++++++----------
 be/src/runtime/coordinator.h             |  7 +++-
 be/src/runtime/plan-fragment-executor.cc |  6 +++
 5 files changed, 65 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index c728f4a..5f6cd75 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -152,16 +152,15 @@ void PlanRootSink::CloseConsumer() {
 Status PlanRootSink::GetNext(
     RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
   unique_lock<mutex> l(lock_);
-  DCHECK(!consumer_done_);
 
   results_ = results;
   num_rows_requested_ = num_results;
   sender_cv_.notify_all();
 
   while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.wait(l);
+
   *eos = eos_;
-  RETURN_IF_ERROR(state->CheckQueryState());
-  return Status::OK();
+  return state->CheckQueryState();
 }
 
 void PlanRootSink::GetRowValue(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index cc7c045..a9c47c3b 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -36,16 +36,18 @@ class ExprContext;
 ///
 /// The consumer calls GetNext() with a QueryResultSet and a requested fetch
 /// size. GetNext() shares these fields with Send(), and then signals Send() to begin
-/// populating the result set. GetNext() returns when either the sender has sent all of
-/// its rows, or the requested fetch size has been satisfied.
+/// populating the result set. GetNext() returns when a) the sender has sent all of its
+/// rows b) the requested fetch size has been satisfied or c) the sender calls Close().
 ///
 /// Send() fills in as many rows as are requested from the current batch. When the batch
 /// is exhausted - which may take several calls to GetNext() - control is returned to the
 /// sender to produce another row batch.
 ///
-/// Consumers must call CloseConsumer() when finished to allow the fragment to shut
-/// down. Senders must call Close() to signal to the consumer that no more batches will be
-/// produced.
+/// When the consumer is finished, CloseConsumer() must be called to allow the sender to
+/// exit Send(). Senders must call Close() to signal to the consumer that no more batches
+/// will be produced. CloseConsumer() may be called concurrently with GetNext(). Senders
+/// should ensure that the consumer is not blocked in GetNext() before destroying the
+/// PlanRootSink.
 ///
 /// The sink is thread safe up to a single producer and single consumer.
 ///
@@ -72,17 +74,20 @@ class PlanRootSink : public DataSink {
   virtual Status FlushFinal(RuntimeState* state);
 
   /// To be called by sender only. Signals to the consumer that no more batches will be
-  /// produced, then blocks until the consumer calls CloseConsumer().
+  /// produced, then blocks until someone calls CloseConsumer().
   virtual void Close(RuntimeState* state);
 
   /// Populates 'result_set' with up to 'num_rows' rows produced by the fragment instance
-  /// that calls Send(). *eos is set to 'true' when there are no more rows to consume.
+  /// that calls Send(). *eos is set to 'true' when there are no more rows to consume. If
+  /// CloseConsumer() is called concurrently, GetNext() will return and may not populate
+  /// 'result_set'. All subsequent calls after CloseConsumer() will do no work.
   Status GetNext(
       RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos);
 
-  /// Signals to the producer that the sink will no longer be used. It's an error to call
-  /// GetNext() after this returns. May be called more than once; only the first call has
-  /// any effect.
+  /// Signals to the producer that the sink will no longer be used. GetNext() may be
+  /// safely called after this returns (it does nothing), but consumers should consider
+  /// that the PlanRootSink may be undergoing destruction. May be called more than once;
+  /// only the first call has any effect.
   void CloseConsumer();
 
   static const std::string NAME;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index f991e1c..7df0bf0 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -162,12 +162,14 @@ class Coordinator::FragmentInstanceState {
   }
 
   /// Called to set the initial status of the fragment instance after the
-  /// ExecRemoteFragment() RPC has returned.
-  void SetInitialStatus(const Status& status) {
+  /// ExecRemoteFragment() RPC has returned. If 'rpc_sent' is true,
+  /// CancelFragmentInstances() will include this instance in the set of potential
+  /// fragment instances to cancel.
+  void SetInitialStatus(const Status& status, bool rpc_sent) {
     DCHECK(!rpc_sent_);
+    rpc_sent_ = rpc_sent;
     status_ = status;
     if (!status_.ok()) return;
-    rpc_sent_ = true;
     stopwatch_.Start();
   }
 
@@ -244,7 +246,8 @@ class Coordinator::FragmentInstanceState {
   /// been initiated; either way, execution must not be cancelled.
   Status status_;
 
-  /// If true, ExecPlanFragment() rpc has been sent.
+  /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
+  /// successful.
   bool rpc_sent_;
 
   /// If true, execution terminated; do not cancel in that case.
@@ -502,17 +505,30 @@ Status Coordinator::Exec() {
     StartFragments();
   }
 
+  // In the error case, it's safe to return and not to get root_sink_ here to close - if
+  // there was an error, but the coordinator fragment was successfully started, it should
+  // cancel itself when it receives an error status after reporting its profile.
   RETURN_IF_ERROR(FinishInstanceStartup());
 
   // Grab executor and wait until Prepare() has finished so that runtime state etc. will
-  // be set up.
+  // be set up. Must do this here in order to get a reference to root_fragment_instance_
+  // so that root_sink_ remains valid throughout query lifetime.
   if (schedule_.GetCoordFragment() != nullptr) {
     // Coordinator fragment instance has same ID as query.
-    shared_ptr<FragmentMgr::FragmentExecState> root_fragment_instance =
+    root_fragment_instance_ =
         ExecEnv::GetInstance()->fragment_mgr()->GetFragmentExecState(query_id_);
-    DCHECK(root_fragment_instance.get() != nullptr);
-    executor_ = root_fragment_instance->executor();
-
+    // Fragment instance might have been failed and unregistered itself even though it was
+    // successfully started (e.g. Prepare() might have failed).
+    if (root_fragment_instance_.get() == nullptr) {
+      FragmentInstanceState* root_state = fragment_instance_states_[0];
+      DCHECK(root_state != nullptr);
+      lock_guard<mutex> instance_state_lock(*root_state->lock());
+      // Try and return the fragment instance status if it was already set.
+      // TODO: Consider waiting for root_state->done() here.
+      RETURN_IF_ERROR(*root_state->status());
+      return Status(Substitute("Root fragment instance ($0) failed", PrintId(query_id_)));
+    }
+    executor_ = root_fragment_instance_->executor();
     // When WaitForPrepare() returns OK(), the executor's root sink will be set up. At
     // that point, the coordinator must be sure to call root_sink()->CloseConsumer(); the
     // fragment instance's executor will not complete until that point.
@@ -1417,7 +1433,7 @@ void Coordinator::MtExecRemoteFInstance(
   ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(),
       exec_state->impalad_address(), &client_connect_status);
   if (!client_connect_status.ok()) {
-    exec_state->SetInitialStatus(client_connect_status);
+    exec_state->SetInitialStatus(client_connect_status, false);
     return;
   }
 
@@ -1432,7 +1448,7 @@ void Coordinator::MtExecRemoteFInstance(
     const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
         PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg());
     VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg));
+    exec_state->SetInitialStatus(Status(err_msg), true);
     return;
   }
 
@@ -1442,11 +1458,11 @@ void Coordinator::MtExecRemoteFInstance(
         PrintId(exec_state->fragment_instance_id()),
         exec_status.msg().GetFullMessageDetails());
     VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg));
+    exec_state->SetInitialStatus(Status(err_msg), true);
     return;
   }
 
-  exec_state->SetInitialStatus(Status::OK());
+  exec_state->SetInitialStatus(Status::OK(), true);
   VLOG_FILE << "rpc succeeded: ExecPlanFragment"
       << " instance_id=" << PrintId(exec_state->fragment_instance_id());
 }
@@ -1480,7 +1496,7 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_par
   ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(),
       exec_state->impalad_address(), &client_connect_status);
   if (!client_connect_status.ok()) {
-    exec_state->SetInitialStatus(client_connect_status);
+    exec_state->SetInitialStatus(client_connect_status, true);
     return;
   }
 
@@ -1497,7 +1513,7 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_par
         Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
           rpc_status.msg().msg());
     VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg));
+    exec_state->SetInitialStatus(Status(err_msg), true);
     return;
   }
 
@@ -1507,11 +1523,11 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_par
         Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
           exec_plan_status.msg().GetFullMessageDetails());
     VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg));
+    exec_state->SetInitialStatus(Status(err_msg), true);
     return;
   }
 
-  exec_state->SetInitialStatus(Status::OK());
+  exec_state->SetInitialStatus(Status::OK(), true);
   return;
 }
 
@@ -1547,15 +1563,16 @@ void Coordinator::CancelFragmentInstances() {
     // to set its status)
     lock_guard<mutex> l(*exec_state->lock());
 
-    // no need to cancel if we already know it terminated w/ an error status
-    if (!exec_state->status()->ok()) continue;
-
     // Nothing to cancel if the exec rpc was not sent
     if (!exec_state->rpc_sent()) continue;
 
     // don't cancel if it already finished
     if (exec_state->done()) continue;
 
+    /// If the status is not OK, we still try to cancel - !OK status might mean
+    /// communication failure between fragment instance and coordinator, but fragment
+    /// instance might still be running.
+
     // set an error status to make sure we only cancel this once
     exec_state->set_status(Status::CANCELLED);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 066d532..176d89d 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -296,8 +296,11 @@ class Coordinator {
   /// Result rows are materialized by this fragment instance in its own thread. They are
   /// materialized into a QueryResultSet provided to the coordinator during GetNext().
   ///
-  /// Not owned by this class, created during fragment instance start-up by
-  /// FragmentExecState and set here in Exec().
+  /// Created during fragment instance start-up by FragmentExecState and set here in
+  /// Exec().  Keep a shared_ptr reference to the fragment state so that root_sink_ will
+  /// be valid for the lifetime of the coordinator. This is important if, for example, the
+  /// fragment instance is cancelled while the coordinator is calling GetNext().
+  std::shared_ptr<FragmentMgr::FragmentExecState> root_fragment_instance_;
   PlanFragmentExecutor* executor_ = nullptr;
   PlanRootSink* root_sink_ = nullptr;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index d079c36..c7b580e 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -504,6 +504,12 @@ void PlanFragmentExecutor::Cancel() {
     VLOG_QUERY << "Cancel() called before Prepare()";
     return;
   }
+
+  // Ensure that the sink is closed from both sides. Although in ordinary executions we
+  // rely on the consumer to do this, in error cases the consumer may not be able to send
+  // CloseConsumer() (see IMPALA-4348 for an example).
+  if (root_sink_ != nullptr) root_sink_->CloseConsumer();
+
   DCHECK(runtime_state_ != NULL);
   VLOG_QUERY << "Cancel(): instance_id=" << runtime_state_->fragment_instance_id();
   runtime_state_->set_is_cancelled(true);


[3/5] incubator-impala git commit: IMPALA-4388: Fix query option reset in tests

Posted by he...@apache.org.
IMPALA-4388: Fix query option reset in tests

Before this change, using sync_ddl=1 could prevent query options from
being reset correctly. The test execution would use a connection to a
random impalad and execute the test against it, but then undo all
changes to the query options on the default connection (instead of the
one used for the test).

The fix is to undo the changes on the correct connection.

Change-Id: I82e97438ee9f4f75907704653faa884722213f5d
Reviewed-on: http://gerrit.cloudera.org:8080/4870
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2808b84a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2808b84a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2808b84a

Branch: refs/heads/master
Commit: 2808b84ad72ea58cd29d06b4b6d18f5061c93474
Parents: f7d7195
Author: Lars Volker <lv...@cloudera.com>
Authored: Thu Oct 27 13:18:34 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Oct 28 01:26:26 2016 +0000

----------------------------------------------------------------------
 tests/common/impala_test_suite.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2808b84a/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index df88148..46b7eee 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -169,14 +169,14 @@ class ImpalaTestSuite(BaseTestSuite):
     self.client.set_configuration({'sync_ddl': sync_ddl})
     self.client.execute("drop database if exists `" + db_name + "` cascade")
 
-  def __restore_query_options(self, query_options_changed):
+  def __restore_query_options(self, query_options_changed, impalad_client):
     """
     Restore the list of modified query options to their default values.
     """
     # Populate the default query option if it's empty.
     if not self.default_query_options:
       try:
-        query_options = self.client.get_default_configuration()
+        query_options = impalad_client.get_default_configuration()
         for query_option in query_options:
           self.default_query_options[query_option.key.upper()] = query_option.value
       except Exception as e:
@@ -190,7 +190,7 @@ class ImpalaTestSuite(BaseTestSuite):
       default_val = self.default_query_options[query_option]
       query_str = 'SET '+ query_option + '=' + default_val + ';'
       try:
-        self.client.execute(query_str)
+        impalad_client.execute(query_str)
       except Exception as e:
         LOG.info('Unexpected exception when executing ' + query_str + ' : ' + str(e))
 
@@ -325,7 +325,7 @@ class ImpalaTestSuite(BaseTestSuite):
         raise
       finally:
         if len(query_options_changed) > 0:
-          self.__restore_query_options(query_options_changed)
+          self.__restore_query_options(query_options_changed, target_impalad_client)
 
       if 'CATCH' in test_section:
         assert test_section['CATCH'].strip() == ''


[4/5] incubator-impala git commit: IMPALA-4383: Ensure plan fragment report thread is always started

Posted by he...@apache.org.
IMPALA-4383: Ensure plan fragment report thread is always started

PlanFragmentExecutor::report_thread_active_ was set during
OpenInternal() after ReportProfile() - run in a different thread -
signalled that it was running.

Howeer, ReportProfile() reads report_thread_active_ to determine if it
should exit its loop. If it runs fast enough, ReportProfile() will never
see report_thread_active_ == true.

This patch moves setting report_thread_active_ to ReportProfile() -
slightly earlier, but visible at almost the same time because it is now
correctly protected by report_thread_lock_.

Testing:
* TestRPCTimeout would hit this in certain configurations. Ran
  test_execplanfragment_timeout() for 90 minutes with no failures;
  previously it would fail within the first five attempts repeatedly.

Change-Id: I5d3cab4cc5245014758e6b70dec7a706a7fa929b
Reviewed-on: http://gerrit.cloudera.org:8080/4864
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d82411f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d82411f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d82411f8

Branch: refs/heads/master
Commit: d82411f81c746ec5df2f659e97e6b3ba4472676c
Parents: 2808b84
Author: Henry Robinson <he...@cloudera.com>
Authored: Wed Oct 26 02:50:07 2016 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Fri Oct 28 19:40:32 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/plan-fragment-executor.cc | 2 +-
 be/src/runtime/plan-fragment-executor.h  | 6 +++++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d82411f8/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index bfb80ea..d079c36 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -313,7 +313,6 @@ Status PlanFragmentExecutor::OpenInternal() {
     // make sure the thread started up, otherwise ReportProfile() might get into a race
     // with StopReportThread()
     report_thread_started_cv_.wait(l);
-    report_thread_active_ = true;
   }
 
   OptimizeLlvmModule();
@@ -381,6 +380,7 @@ void PlanFragmentExecutor::ReportProfile() {
   DCHECK(!report_status_cb_.empty());
   unique_lock<mutex> l(report_thread_lock_);
   // tell Open() that we started
+  report_thread_active_ = true;
   report_thread_started_cv_.notify_one();
 
   // Jitter the reporting time of remote fragments by a random amount between

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d82411f8/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index 82d3001..6385213 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -166,7 +166,11 @@ class PlanFragmentExecutor {
   /// Indicates that profile reporting thread started.
   /// Tied to report_thread_lock_.
   boost::condition_variable report_thread_started_cv_;
-  bool report_thread_active_;  // true if we started the thread
+
+  /// When the report thread starts, it sets 'report_thread_active_' to true and signals
+  /// 'report_thread_started_cv_'. The report thread is shut down by setting
+  /// 'report_thread_active_' to false and signalling 'stop_report_thread_cv_'.
+  bool report_thread_active_;
 
   /// true if Close() has been called
   bool closed_;


[2/5] incubator-impala git commit: IMPALA-4369: Avoid DCHECK in Parquet scanner with MT_DOP > 0.

Posted by he...@apache.org.
IMPALA-4369: Avoid DCHECK in Parquet scanner with MT_DOP > 0.

When HdfsParquetScanner::Open() failed we used to hit a DCHECK
when trying to access HdfsParquetScanner::batch() which is
only valid to call for non-MT scan nodes.

Change-Id: Ifbfdde505dbbd2742e7ab79a2415ff317a9bfa2f
Reviewed-on: http://gerrit.cloudera.org:8080/4851
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f7d71950
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f7d71950
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f7d71950

Branch: refs/heads/master
Commit: f7d71950e3e2ebda07f90c48d6b93b1335eaa25e
Parents: c01644b
Author: Alex Behm <al...@cloudera.com>
Authored: Tue Oct 25 17:53:59 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Oct 26 22:21:19 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node-base.cc                   |  5 ++++-
 .../queries/QueryTest/mt-dop-parquet.test            |  7 +++++++
 tests/query_test/test_mt_dop.py                      | 15 +++++++++++++++
 3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7d71950/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 957338d..bf0697c 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -635,7 +635,10 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
   Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, runtime_state_);
   if (status.ok()) {
     status = scanner->get()->Open(context);
-    if (!status.ok()) scanner->get()->Close(scanner->get()->batch());
+    if (!status.ok()) {
+      RowBatch* batch = (HasRowBatchQueue()) ? scanner->get()->batch() : NULL;
+      scanner->get()->Close(batch);
+    }
   } else {
     context->ClearStreams();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7d71950/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
new file mode 100644
index 0000000..39ec4b3
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet.test
@@ -0,0 +1,7 @@
+====
+---- QUERY
+# IMPALA-4369: Parquet file with invalid metadata size in the file footer.
+select * from functional_parquet.bad_metadata_len
+---- CATCH
+Invalid metadata size in file footer
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f7d71950/tests/query_test/test_mt_dop.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index 1cd6d31..515c5f8 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -45,3 +45,18 @@ class TestMtDop(ImpalaTestSuite):
     new_vector = deepcopy(vector)
     new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     self.run_test_case('QueryTest/mt-dop', new_vector)
+
+class TestMtDopParquet(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestMtDopParquet, cls).add_test_dimensions()
+    cls.TestMatrix.add_dimension(TestDimension('mt_dop', *MT_DOP_VALUES))
+    cls.TestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def test_parquet(self, vector):
+    self.run_test_case('QueryTest/mt-dop-parquet', vector)