You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/28 22:30:41 UTC

[impala] 02/07: IMPALA-5397: Set query's end_time_us_ when the operation completes

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 793e8192efae3b777717408b41b3787e681968c6
Author: poojanilangekar <po...@cloudera.com>
AuthorDate: Wed Feb 20 17:22:03 2019 -0800

    IMPALA-5397: Set query's end_time_us_ when the operation completes
    
    Previously, a query's end time was set only when UnregisterQuery()
    was called. This was misleading because the query could have
    completed earlier. After this change, if the query has a
    coordinator, the end time is set when the query releases its
    admission control resources. For queries without coordinators,
    (e.g. DDL queries), the end time would still be set when
    UnregisterQuery() is called.
    
    Testing:
    Added a test to ensure that the end time is set before the query
    is closed for a query with a coordinator. It also ensures that for
    a query without a coordinator, the end time is set only when the
    query is closed.
    Tested queries with various settings of --idle-query-timeout.
    Ran exhaustive end-to-end tests and the stress test without
    any failures.
    
    Change-Id: Iaa9a1b443df3dbd95f9f297c2f923ad795b14745
    Reviewed-on: http://gerrit.cloudera.org:8080/12583
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc          | 10 +++++++---
 be/src/runtime/coordinator.h           |  8 +++++++-
 be/src/service/client-request-state.cc | 19 +++++++++++-------
 be/src/service/client-request-state.h  | 19 ++++++++++++++----
 tests/common/impala_test_suite.py      |  2 +-
 tests/query_test/test_observability.py | 36 ++++++++++++++++++++++++++++++++++
 6 files changed, 78 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 1579ada..998017d 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -43,6 +43,7 @@
 #include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
 #include "scheduling/query-schedule.h"
+#include "service/client-request-state.h"
 #include "util/bloom-filter.h"
 #include "util/counting-barrier.h"
 #include "util/hdfs-bulk-ops.h"
@@ -69,9 +70,10 @@ using namespace impala;
 // Maximum number of fragment instances that can publish each broadcast filter.
 static const int MAX_BROADCAST_FILTER_PRODUCERS = 3;
 
-Coordinator::Coordinator(
-    const QuerySchedule& schedule, RuntimeProfile::EventSequence* events)
-  : schedule_(schedule),
+Coordinator::Coordinator(ClientRequestState* parent, const QuerySchedule& schedule,
+    RuntimeProfile::EventSequence* events)
+  : parent_request_state_(parent),
+    schedule_(schedule),
     filter_mode_(schedule.query_options().runtime_filter_mode),
     obj_pool_(new ObjectPool()),
     query_events_(events) {}
@@ -564,6 +566,8 @@ void Coordinator::HandleExecStateTransition(
     CancelBackends();
   }
   ReleaseAdmissionControlResources();
+  // Once the query has released its admission control resources, update its end time.
+  parent_request_state_->UpdateEndTime();
   // Can compute summary only after we stop accepting reports from the backends. Both
   // WaitForBackends() and CancelBackends() ensures that.
   // TODO: should move this off of the query execution path?
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index b040120..2135b92 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -37,6 +37,7 @@
 namespace impala {
 
 class CountingBarrier;
+class ClientRequestState;
 class FragmentInstanceState;
 class MemTracker;
 class ObjectPool;
@@ -97,7 +98,8 @@ class TUpdateCatalogRequest;
 /// and unnest them
 class Coordinator { // NOLINT: The member variables could be re-ordered to save space
  public:
-  Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* events);
+  Coordinator(ClientRequestState* parent, const QuerySchedule& schedule,
+      RuntimeProfile::EventSequence* events);
   ~Coordinator();
 
   /// Initiate asynchronous execution of a query with the given schedule. When it
@@ -218,6 +220,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   class FilterState;
   class FragmentStats;
 
+  /// The parent ClientRequestState object for this coordinator. The reference is set in
+  /// the constructor. It always outlives the this coordinator.
+  ClientRequestState* parent_request_state_;
+
   /// owned by the ClientRequestState that owns this coordinator
   const QuerySchedule& schedule_;
 
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index b1070bd..f8c1b95 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -525,7 +525,7 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
     lock_guard<mutex> l(lock_);
     if (!UpdateQueryStatus(admit_status).ok()) return;
   }
-  coord_.reset(new Coordinator(*schedule_, query_events_));
+  coord_.reset(new Coordinator(this, *schedule_, query_events_));
   Status exec_status = coord_->Exec();
 
   DebugActionNoFail(schedule_->query_options(), "CRS_AFTER_COORD_STARTS");
@@ -734,14 +734,9 @@ void ClientRequestState::Done() {
     }
   }
 
+  UpdateEndTime();
   unique_lock<mutex> l(lock_);
-  end_time_us_ = UnixMicros();
-  // Certain API clients expect Start Time and End Time to be date-time strings
-  // of nanosecond precision, so we explicitly specify the precision here.
-  summary_profile_->AddInfoString("End Time", ToStringFromUnixMicros(end_time_us(),
-      TimePrecision::Nanosecond));
   query_events_->MarkEvent("Unregister query");
-
   // Update result set cache metrics, and update mem limit accounting before tearing
   // down the coordinator.
   ClearResultCache();
@@ -1313,4 +1308,14 @@ void ClientRequestState::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK(coord_.get());
   coord_->UpdateFilter(params);
 }
+
+void ClientRequestState::UpdateEndTime() {
+  // Update the query's end time only if it isn't set previously.
+  if (end_time_us_.CompareAndSwap(0, UnixMicros())) {
+    // Certain API clients expect Start Time and End Time to be date-time strings
+    // of nanosecond precision, so we explicitly specify the precision here.
+    summary_profile_->AddInfoString(
+        "End Time", ToStringFromUnixMicros(end_time_us(), TimePrecision::Nanosecond));
+  }
+}
 }
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 2a2338e..1d9042a 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -223,7 +223,7 @@ class ClientRequestState {
   const RuntimeProfile* profile() const { return profile_; }
   const RuntimeProfile* summary_profile() const { return summary_profile_; }
   int64_t start_time_us() const { return start_time_us_; }
-  int64_t end_time_us() const { return end_time_us_; }
+  int64_t end_time_us() const { return end_time_us_.Load(); }
   const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; }
   const TQueryOptions& query_options() const {
     return query_ctx_.client_request.query_options;
@@ -259,7 +259,17 @@ class ClientRequestState {
   RuntimeProfile::EventSequence* query_events() const { return query_events_; }
   RuntimeProfile* summary_profile() { return summary_profile_; }
 
+ protected:
+  /// Updates the end_time_us_ of this query if it isn't set. The end time is determined
+  /// when this function is called for the first time, calling it multiple times does not
+  /// change the end time.
+  void UpdateEndTime();
+
  private:
+  /// The coordinator is a friend class because it needs to be able to call
+  /// UpdateEndTime() when a query's admission control resources are released.
+  friend class Coordinator;
+
   const TQueryCtx query_ctx_;
 
   /// Ensures single-threaded execution of FetchRows(). Callers of FetchRows() are
@@ -424,10 +434,11 @@ class ClientRequestState {
   ImpalaServer* parent_server_;
 
   /// Start/end time of the query, in Unix microseconds.
+  int64_t start_time_us_;
   /// end_time_us_ is initialized to 0, which is used to indicate that the query is not
-  /// yet done. It is assinged the final value in
-  /// ClientRequestState::Done().
-  int64_t start_time_us_, end_time_us_ = 0;
+  /// yet done. It is assinged the final value in ClientRequestState::Done() or when the
+  /// coordinator relases its admission control resources.
+  AtomicInt64 end_time_us_{0};
 
   /// Executes a local catalog operation (an operation that does not need to execute
   /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 06299e4..9600820 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -644,7 +644,7 @@ class ImpalaTestSuite(BaseTestSuite):
 
   @execute_wrapper
   def execute_query_async(self, query, query_options=None):
-    self.client.set_configuration(query_options)
+    if query_options is not None: self.client.set_configuration(query_options)
     return self.client.execute_async(query)
 
   @execute_wrapper
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index d25170f..b2db889 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -515,3 +515,39 @@ class TestObservability(ImpalaTestSuite):
 
     assert len(end_time_sub_sec_str) == 9, end_time
     assert len(start_time_sub_sec_str) == 9, start_time
+
+  @pytest.mark.execute_serially
+  def test_end_time(self):
+    """ Test that verifies that the end time of a query with a coordinator is set once
+    the coordinator releases its admission control resources. This ensures that the
+    duration of the query will be determined by the time taken to do real work rather
+    than the duration for which the query remains open. On the other hand, for queries
+    without coordinators, the End Time is set only when UnregisterQuery() is called."""
+    # Test the end time of a query with a coordinator.
+    query = "select 1"
+    handle = self.execute_query_async(query)
+    result = self.client.fetch(query, handle)
+    # Ensure that the query returns a non-empty result set.
+    assert result is not None
+    # Once the results have been fetched, the query End Time must be set.
+    query_id = handle.get_handle().id
+    tree = self._get_thrift_profile(query_id)
+    end_time = tree.nodes[1].info_strings["End Time"]
+    assert end_time is not None
+    self.client.close_query(handle)
+    # Test the end time of a query without a coordinator.
+    query = "describe functional.alltypes"
+    handle = self.execute_query_async(query)
+    result = self.client.fetch(query, handle)
+    # Ensure that the query returns a non-empty result set.
+    assert result is not None
+    # The query End Time must not be set until the query is unregisterted
+    query_id = handle.get_handle().id
+    tree = self._get_thrift_profile(query_id)
+    end_time = tree.nodes[1].info_strings["End Time"]
+    assert len(end_time) == 0, end_time
+    self.client.close_query(handle)
+    # The query End Time must be set after the query is unregistered
+    tree = self._get_thrift_profile(query_id)
+    end_time = tree.nodes[1].info_strings["End Time"]
+    assert end_time is not None