You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/12/07 22:16:34 UTC

[1/2] impala git commit: IMPALA-4671: (part-2) Replace kudu::ServicePool with one that uses Impala threads

Repository: impala
Updated Branches:
  refs/heads/master a94d6068c -> f3fa3e017


IMPALA-4671: (part-2) Replace kudu::ServicePool with one that uses Impala threads

The KuduRPC subsystem uses kudu::ServicePool to service all incoming
RPCs. Since this lives inside the Kudu codebase, all instrumentation
of RPCs flowing through the KuduRPC subsystem is not visible to Impala,
as Kudu does not export this instrumentation, but only maintains it
internally within kudu::ServicePool.

In order to reliably view the instrumentation of all RPCs flowing through
KRPC, one option is to modify kudu::ServicePool to take their
instrumentation and display it on our webpages, which is very invasive.

A second option is to have a parallel implementation of kudu::ServicePool
which for all purposes behaves the same way, but instead has this extra
code that displays this instrumentation in our webpages.

This patch is Part 2 of the second option.

In Part 1, we simply copied the code from kudu::ServicePool into the Impala
namespace. In this patch, we rename the redundant kudu::ServicePool
(that was copied into be/src/rpc/) to ImpalaServicePool. We also expose
the instrumentaion of the ImpalaServicePool as JSON.

Now, the threads in use by the service pool will also be visible in
the /threadz page.

This patch however does not display instrumentation on the Webpages yet.
In a future patch, we will add that through the ImpalaServicePool and the
RpcMgr. Tracked by IMPALA-6269

This patch is partially based on an abandoned patch by Henry Robinson.

Change-Id: Ia3e0fa9ae22c4803b0cea736eb35e333e2671448
Reviewed-on: http://gerrit.cloudera.org:8080/8472
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/82267a27
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/82267a27
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/82267a27

Branch: refs/heads/master
Commit: 82267a2779da2165802195ba202223d2c205b73d
Parents: a94d606
Author: Sailesh Mukil <sa...@apache.org>
Authored: Mon Sep 18 14:37:59 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Dec 7 18:29:59 2017 +0000

----------------------------------------------------------------------
 be/src/rpc/CMakeLists.txt         |   1 +
 be/src/rpc/impala-service-pool.cc | 147 +++++++++++++++------------------
 be/src/rpc/impala-service-pool.h  |  80 ++++++++----------
 be/src/rpc/rpc-mgr.cc             |  20 +++--
 be/src/rpc/rpc-mgr.h              |   4 +-
 be/src/rpc/rpc-trace.h            |   2 +-
 6 files changed, 116 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/82267a27/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 86a39be..4234e2b 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -28,6 +28,7 @@ set_source_files_properties(${RPC_TEST_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 add_library(Rpc
   authentication.cc
   ${COMMON_PROTO_SRCS}
+  impala-service-pool.cc
   rpc-mgr.cc
   rpc-trace.cc
   TAcceptQueueServer.cpp

http://git-wip-us.apache.org/repos/asf/impala/blob/82267a27/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 1a23ca9..3b1c02d 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -15,98 +15,83 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/rpc/service_pool.h"
+#include "rpc/impala-service-pool.h"
 
+#include <boost/thread/mutex.hpp>
 #include <glog/logging.h>
 #include <memory>
 #include <string>
 #include <vector>
 
+#include "exec/kudu-util.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/service_queue.h"
-#include "kudu/util/logging.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
-#include "kudu/util/thread.h"
 #include "kudu/util/trace.h"
 
-using std::shared_ptr;
-using strings::Substitute;
+#include "common/names.h"
+#include "common/status.h"
 
-METRIC_DEFINE_histogram(server, rpc_incoming_queue_time,
-                        "RPC Queue Time",
-                        kudu::MetricUnit::kMicroseconds,
-                        "Number of microseconds incoming RPC requests spend in the worker queue",
-                        60000000LU, 3);
+METRIC_DEFINE_histogram(server, impala_unused,
+    "RPC Queue Time",
+    kudu::MetricUnit::kMicroseconds,
+    "Number of microseconds incoming RPC requests spend in the worker queue",
+    60000000LU, 3);
 
-METRIC_DEFINE_counter(server, rpcs_timed_out_in_queue,
-                      "RPC Queue Timeouts",
-                      kudu::MetricUnit::kRequests,
-                      "Number of RPCs whose timeout elapsed while waiting "
-                      "in the service queue, and thus were not processed.");
+namespace impala {
 
-METRIC_DEFINE_counter(server, rpcs_queue_overflow,
-                      "RPC Queue Overflows",
-                      kudu::MetricUnit::kRequests,
-                      "Number of RPCs dropped because the service queue "
-                      "was full.");
-
-namespace kudu {
-namespace rpc {
-
-ServicePool::ServicePool(gscoped_ptr<ServiceIf> service,
-                         const scoped_refptr<MetricEntity>& entity,
+ImpalaServicePool::ImpalaServicePool(std::unique_ptr<kudu::rpc::ServiceIf> service,
+                         const scoped_refptr<kudu::MetricEntity>& entity,
                          size_t service_queue_length)
   : service_(std::move(service)),
     service_queue_(service_queue_length),
-    incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)),
-    rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)),
-    rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)),
-    closing_(false) {
+    unused_histogram_(METRIC_impala_unused.Instantiate(entity)) {
+
 }
 
-ServicePool::~ServicePool() {
+ImpalaServicePool::~ImpalaServicePool() {
   Shutdown();
 }
 
-Status ServicePool::Init(int num_threads) {
+Status ImpalaServicePool::Init(int num_threads) {
   for (int i = 0; i < num_threads; i++) {
-    scoped_refptr<kudu::Thread> new_thread;
-    CHECK_OK(kudu::Thread::Create("service pool", "rpc worker",
-        &ServicePool::RunThread, this, &new_thread));
-    threads_.push_back(new_thread);
+    std::unique_ptr<Thread> new_thread;
+    RETURN_IF_ERROR(Thread::Create("service pool", "rpc worker",
+        &ImpalaServicePool::RunThread, this, &new_thread));
+    threads_.push_back(std::move(new_thread));
   }
   return Status::OK();
 }
 
-void ServicePool::Shutdown() {
+void ImpalaServicePool::Shutdown() {
   service_queue_.Shutdown();
 
-  MutexLock lock(shutdown_lock_);
+  lock_guard<mutex> lock(shutdown_lock_);
   if (closing_) return;
   closing_ = true;
-  // TODO: Use a proper thread pool implementation.
-  for (scoped_refptr<kudu::Thread>& thread : threads_) {
-    CHECK_OK(ThreadJoiner(thread.get()).Join());
+  // TODO (from KRPC): Use a proper thread pool implementation.
+  for (std::unique_ptr<Thread>& thread : threads_) {
+    thread->Join();
   }
 
   // Now we must drain the service queue.
-  Status status = Status::ServiceUnavailable("Service is shutting down");
-  std::unique_ptr<InboundCall> incoming;
+  kudu::Status status = kudu::Status::ServiceUnavailable("Service is shutting down");
+  std::unique_ptr<kudu::rpc::InboundCall> incoming;
   while (service_queue_.BlockingGet(&incoming)) {
-    incoming.release()->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+    incoming.release()->RespondFailure(
+        kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
   }
 
   service_->Shutdown();
 }
 
-void ServicePool::RejectTooBusy(InboundCall* c) {
+void ImpalaServicePool::RejectTooBusy(kudu::rpc::InboundCall* c) {
   string err_msg =
       Substitute("$0 request on $1 from $2 dropped due to backpressure. "
                  "The service queue is full; it has $3 items.",
@@ -114,20 +99,21 @@ void ServicePool::RejectTooBusy(InboundCall* c) {
                  service_->service_name(),
                  c->remote_address().ToString(),
                  service_queue_.max_size());
-  rpcs_queue_overflow_->Increment();
-  KLOG_EVERY_N_SECS(WARNING, 1) << err_msg;
-  c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
-                    Status::ServiceUnavailable(err_msg));
-  DLOG(INFO) << err_msg << " Contents of service queue:\n"
-             << service_queue_.ToString();
+  rpcs_queue_overflow_.Add(1);
+  c->RespondFailure(kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+                    kudu::Status::ServiceUnavailable(err_msg));
+  VLOG(1) << err_msg << " Contents of service queue:\n"
+          << service_queue_.ToString();
 }
 
-RpcMethodInfo* ServicePool::LookupMethod(const RemoteMethod& method) {
+kudu::rpc::RpcMethodInfo* ImpalaServicePool::LookupMethod(
+    const kudu::rpc::RemoteMethod& method) {
   return service_->LookupMethod(method);
 }
 
-Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) {
-  InboundCall* c = call.release();
+kudu::Status ImpalaServicePool::QueueInboundCall(
+    gscoped_ptr<kudu::rpc::InboundCall> call) {
+  kudu::rpc::InboundCall* c = call.release();
 
   vector<uint32_t> unsupported_features;
   for (uint32_t feature : c->GetRequiredFeatures()) {
@@ -138,64 +124,66 @@ Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) {
 
   if (!unsupported_features.empty()) {
     c->RespondUnsupportedFeature(unsupported_features);
-    return Status::NotSupported("call requires unsupported application feature flags",
+    return kudu::Status::NotSupported("call requires unsupported application feature flags",
                                 JoinMapped(unsupported_features,
                                            [] (uint32_t flag) { return std::to_string(flag); },
                                            ", "));
   }
 
-  TRACE_TO(c->trace(), "Inserting onto call queue");
+  TRACE_TO(c->trace(), "Inserting onto call queue"); // NOLINT(*)
 
   // Queue message on service queue
-  boost::optional<InboundCall*> evicted;
+  boost::optional<kudu::rpc::InboundCall*> evicted;
   auto queue_status = service_queue_.Put(c, &evicted);
-  if (queue_status == QUEUE_FULL) {
+  if (queue_status == kudu::rpc::QueueStatus::QUEUE_FULL) {
     RejectTooBusy(c);
-    return Status::OK();
+    return kudu::Status::OK();
   }
 
   if (PREDICT_FALSE(evicted != boost::none)) {
     RejectTooBusy(*evicted);
   }
 
-  if (PREDICT_TRUE(queue_status == QUEUE_SUCCESS)) {
+  if (PREDICT_TRUE(queue_status == kudu::rpc::QueueStatus::QUEUE_SUCCESS)) {
     // NB: do not do anything with 'c' after it is successfully queued --
     // a service thread may have already dequeued it, processed it, and
     // responded by this point, in which case the pointer would be invalid.
-    return Status::OK();
+    return kudu::Status::OK();
   }
 
-  Status status = Status::OK();
-  if (queue_status == QUEUE_SHUTDOWN) {
-    status = Status::ServiceUnavailable("Service is shutting down");
-    c->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+  kudu::Status status = kudu::Status::OK();
+  if (queue_status == kudu::rpc::QueueStatus::QUEUE_SHUTDOWN) {
+    status = kudu::Status::ServiceUnavailable("Service is shutting down");
+    c->RespondFailure(kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
   } else {
-    status = Status::RuntimeError(Substitute("Unknown error from BlockingQueue: $0", queue_status));
-    c->RespondFailure(ErrorStatusPB::FATAL_UNKNOWN, status);
+    status = kudu::Status::RuntimeError(
+        Substitute("Unknown error from BlockingQueue: $0", queue_status));
+    c->RespondFailure(kudu::rpc::ErrorStatusPB::FATAL_UNKNOWN, status);
   }
   return status;
 }
 
-void ServicePool::RunThread() {
+void ImpalaServicePool::RunThread() {
   while (true) {
-    std::unique_ptr<InboundCall> incoming;
+    std::unique_ptr<kudu::rpc::InboundCall> incoming;
     if (!service_queue_.BlockingGet(&incoming)) {
-      VLOG(1) << "ServicePool: messenger shutting down.";
+      VLOG(1) << "ImpalaServicePool: messenger shutting down.";
       return;
     }
 
-    incoming->RecordHandlingStarted(incoming_queue_time_);
+    // We need to call RecordHandlingStarted() to update the InboundCall timing.
+    incoming->RecordHandlingStarted(unused_histogram_);
     ADOPT_TRACE(incoming->trace());
 
     if (PREDICT_FALSE(incoming->ClientTimedOut())) {
-      TRACE_TO(incoming->trace(), "Skipping call since client already timed out");
-      rpcs_timed_out_in_queue_->Increment();
+      TRACE_TO(incoming->trace(), "Skipping call since client already timed out"); // NOLINT(*)
+      rpcs_timed_out_in_queue_.Add(1);
 
       // Respond as a failure, even though the client will probably ignore
       // the response anyway.
       incoming->RespondFailure(
-        ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
-        Status::TimedOut("Call waited in the queue past client deadline"));
+        kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+        kudu::Status::TimedOut("Call waited in the queue past client deadline"));
 
       // Must release since RespondFailure above ends up taking ownership
       // of the object.
@@ -203,7 +191,7 @@ void ServicePool::RunThread() {
       continue;
     }
 
-    TRACE_TO(incoming->trace(), "Handling call");
+    TRACE_TO(incoming->trace(), "Handling call"); // NOLINT(*)
 
     // Release the InboundCall pointer -- when the call is responded to,
     // it will get deleted at that point.
@@ -211,9 +199,8 @@ void ServicePool::RunThread() {
   }
 }
 
-const string ServicePool::service_name() const {
+const string ImpalaServicePool::service_name() const {
   return service_->service_name();
 }
 
-} // namespace rpc
-} // namespace kudu
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/82267a27/be/src/rpc/impala-service-pool.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index 70611c8..93f2972 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef KUDU_SERVICE_POOL_H
-#define KUDU_SERVICE_POOL_H
+#ifndef IMPALA_SERVICE_POOL_H
+#define IMPALA_SERVICE_POOL_H
 
 #include <string>
 #include <vector>
@@ -26,30 +26,20 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/rpc_service.h"
 #include "kudu/rpc/service_queue.h"
-#include "kudu/util/mutex.h"
-#include "kudu/util/thread.h"
 #include "kudu/util/status.h"
+#include "util/histogram-metric.h"
+#include "util/thread.h"
 
-namespace kudu {
-
-class Counter;
-class Histogram;
-class MetricEntity;
-class Socket;
-
-namespace rpc {
-
-class Messenger;
-class ServiceIf;
+namespace impala {
 
 // A pool of threads that handle new incoming RPC calls.
 // Also includes a queue that calls get pushed onto for handling by the pool.
-class ServicePool : public RpcService {
+class ImpalaServicePool : public kudu::rpc::RpcService {
  public:
-  ServicePool(gscoped_ptr<ServiceIf> service,
-              const scoped_refptr<MetricEntity>& metric_entity,
+  ImpalaServicePool(std::unique_ptr<kudu::rpc::ServiceIf> service,
+              const scoped_refptr<kudu::MetricEntity>& metric_entity,
               size_t service_queue_length);
-  virtual ~ServicePool();
+  virtual ~ImpalaServicePool();
 
   // Start up the thread pool.
   virtual Status Init(int num_threads);
@@ -57,42 +47,40 @@ class ServicePool : public RpcService {
   // Shut down the queue and the thread pool.
   virtual void Shutdown();
 
-  RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
-
-  virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) OVERRIDE;
+  kudu::rpc::RpcMethodInfo* LookupMethod(const kudu::rpc::RemoteMethod& method) override;
 
-  const Counter* RpcsTimedOutInQueueMetricForTests() const {
-    return rpcs_timed_out_in_queue_.get();
-  }
-
-  const Histogram* IncomingQueueTimeMetricForTests() const {
-    return incoming_queue_time_.get();
-  }
-
-  const Counter* RpcsQueueOverflowMetric() const {
-    return rpcs_queue_overflow_.get();
-  }
+  virtual kudu::Status
+      QueueInboundCall(gscoped_ptr<kudu::rpc::InboundCall> call) OVERRIDE;
 
   const std::string service_name() const;
 
  private:
   void RunThread();
-  void RejectTooBusy(InboundCall* c);
+  void RejectTooBusy(kudu::rpc::InboundCall* c);
+
+  std::unique_ptr<kudu::rpc::ServiceIf> service_;
+  std::vector<std::unique_ptr<Thread> > threads_;
+  kudu::rpc::LifoServiceQueue service_queue_;
+
+  // TODO: Display these metrics in the debug webpage. IMPALA-6269
+  // Number of RPCs that timed out while waiting in the service queue.
+  AtomicInt32 rpcs_timed_out_in_queue_;
+  // Number of RPCs that were rejected due to the queue being full.
+  AtomicInt32 rpcs_queue_overflow_;
 
-  gscoped_ptr<ServiceIf> service_;
-  std::vector<scoped_refptr<kudu::Thread> > threads_;
-  LifoServiceQueue service_queue_;
-  scoped_refptr<Histogram> incoming_queue_time_;
-  scoped_refptr<Counter> rpcs_timed_out_in_queue_;
-  scoped_refptr<Counter> rpcs_queue_overflow_;
+  // Dummy histogram needed to call InboundCall::RecordHandlingStarted() to set
+  // appropriate internal KRPC state. Unused otherwise.
+  // TODO: Consider displaying this histogram in the debug webpage. IMPALA-6269
+  scoped_refptr<kudu::Histogram> unused_histogram_;
 
-  mutable Mutex shutdown_lock_;
-  bool closing_;
+  // Protects against concurrent Shutdown() operations.
+  // TODO: This seems implausible given our current usage pattern. Consider removing lock.
+  boost::mutex shutdown_lock_;
+  bool closing_ = false;
 
-  DISALLOW_COPY_AND_ASSIGN(ServicePool);
+  DISALLOW_COPY_AND_ASSIGN(ImpalaServicePool);
 };
 
-} // namespace rpc
-} // namespace kudu
+} // namespace impala
 
-#endif
+#endif  // IMPALA_SERVICE_POOL_H

http://git-wip-us.apache.org/repos/asf/impala/blob/82267a27/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 92b065a..c2546fe 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -20,6 +20,7 @@
 #include "exec/kudu-util.h"
 #include "kudu/rpc/acceptor_pool.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/util/net/net_util.h"
 #include "util/auth-util.h"
@@ -28,15 +29,14 @@
 
 #include "common/names.h"
 
+using kudu::HostPort;
+using kudu::MetricEntity;
+using kudu::rpc::AcceptorPool;
 using kudu::rpc::MessengerBuilder;
 using kudu::rpc::Messenger;
-using kudu::rpc::AcceptorPool;
 using kudu::rpc::RpcController;
 using kudu::rpc::ServiceIf;
-using kudu::rpc::ServicePool;
 using kudu::Sockaddr;
-using kudu::HostPort;
-using kudu::MetricEntity;
 
 DECLARE_string(hostname);
 DECLARE_string(principal);
@@ -79,17 +79,19 @@ Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queu
     unique_ptr<ServiceIf> service_ptr) {
   DCHECK(is_inited()) << "Must call Init() before RegisterService()";
   DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()";
-  scoped_refptr<ServicePool> service_pool =
-      new ServicePool(gscoped_ptr<ServiceIf>(service_ptr.release()),
+  scoped_refptr<ImpalaServicePool> service_pool =
+      new ImpalaServicePool(std::move(service_ptr),
           messenger_->metric_entity(), service_queue_depth);
   // Start the thread pool first before registering the service in case the startup fails.
-  KUDU_RETURN_IF_ERROR(
-      service_pool->Init(num_service_threads), "Service pool failed to start");
+  RETURN_IF_ERROR(
+      service_pool->Init(num_service_threads));
   KUDU_RETURN_IF_ERROR(
       messenger_->RegisterService(service_pool->service_name(), service_pool),
       "Could not register service");
-  service_pools_.push_back(service_pool);
+
   VLOG_QUERY << "Registered KRPC service: " << service_pool->service_name();
+  service_pools_.push_back(std::move(service_pool));
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/82267a27/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index 85f88b6..26dbae0 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -21,8 +21,8 @@
 #include "common/status.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/result_tracker.h"
-#include "kudu/rpc/service_pool.h"
 #include "kudu/util/metrics.h"
+#include "rpc/impala-service-pool.h"
 
 #include "gen-cpp/Types_types.h"
 
@@ -158,7 +158,7 @@ class RpcMgr {
 
  private:
   /// One pool per registered service. scoped_refptr<> is dictated by the Kudu interface.
-  std::vector<scoped_refptr<kudu::rpc::ServicePool>> service_pools_;
+  std::vector<scoped_refptr<ImpalaServicePool>> service_pools_;
 
   /// Required Kudu boilerplate for constructing the MetricEntity passed
   /// to c'tor of ServiceIf when creating a service.

http://git-wip-us.apache.org/repos/asf/impala/blob/82267a27/be/src/rpc/rpc-trace.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-trace.h b/be/src/rpc/rpc-trace.h
index b2861af..1bd7823 100644
--- a/be/src/rpc/rpc-trace.h
+++ b/be/src/rpc/rpc-trace.h
@@ -30,8 +30,8 @@
 
 namespace impala {
 
-class Webserver;
 class MetricGroup;
+class Webserver;
 
 /// An RpcEventHandler is called every time an Rpc is started and completed. There is at
 /// most one RpcEventHandler per ThriftServer. When an Rpc is started, getContext() creates


[2/2] impala git commit: IMPALA-6081: Fix test_basic_filters runtime profile failure

Posted by ta...@apache.org.
IMPALA-6081: Fix test_basic_filters runtime profile failure

test_basic_filters has been occasionally failing due to a line missing
from a runtime profile for a particular query.

The problem is that the query returns all of its results before all of
its fragment instances are finished executing (due to a limit). Then,
when one fragment instance reports its status, the coordinator returns
to it a 'cancelled' status, causing all remaining instances for that
backend to be cancelled.

Sometimes this cancellation happens quickly enough that the relevant
fragment instances have not yet sent a status report when they are
cancelled. They will still send a report in finalize, but as the
coordinator only updates its runtime profile for 'ok' status reports,
not 'cancelled', the final runtime profile doesn't end up with any
data for those fragment instances, which means the test does not find
the line in the runtime profile its checking for.

The fix is to have the coordinator update its runtime profile with
every status report it recieves, regardless of error status.

Testing:
- Ran existing runtime profile tests, which rely on profile output,
  in a loop.
- Manually tested some scenarios with failed queries and checked that
  the new profile output is reasonable.
- Added a new e2e test that runs the affected query and checks for the
  presence of info for all expected exec node in the profile. This
  repros the underlying issue consistently.

Change-Id: I4f581c7c8039f02a33712515c5bffab942309bba
Reviewed-on: http://gerrit.cloudera.org:8080/8754
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f3fa3e01
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f3fa3e01
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f3fa3e01

Branch: refs/heads/master
Commit: f3fa3e017f1341579d008619f57cc5dceb21603d
Parents: 82267a2
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Wed Nov 29 16:46:38 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Dec 7 21:07:02 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.cc | 32 +++++++++++-------------
 be/src/runtime/query-state.cc               |  7 +++---
 tests/query_test/test_observability.py      | 19 ++++++++++++++
 3 files changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f3fa3e01/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 12689e0..973aa25 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -256,22 +256,21 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
         instance_exec_status.fragment_instance_id);
     // Ignore duplicate or out-of-order messages.
     if (instance_stats->done_) continue;
-    if (instance_status.ok()) {
-      instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
-      if (instance_stats->peak_mem_counter_ != nullptr) {
-        // protect against out-of-order status updates
-        peak_consumption_ =
-            max(peak_consumption_, instance_stats->peak_mem_counter_->value());
-      }
-    } else {
-      // if a query is aborted due to an error encountered by a single fragment instance,
-      // all other fragment instances will report a cancelled status; make sure not
-      // to mask the original error status
-      if (status_.ok() || status_.IsCancelled()) {
-        status_ = instance_status;
-        failed_instance_id_ = instance_exec_status.fragment_instance_id;
-        is_fragment_failure_ = true;
-      }
+
+    instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
+    if (instance_stats->peak_mem_counter_ != nullptr) {
+      // protect against out-of-order status updates
+      peak_consumption_ =
+        max(peak_consumption_, instance_stats->peak_mem_counter_->value());
+    }
+
+    // If a query is aborted due to an error encountered by a single fragment instance,
+    // all other fragment instances will report a cancelled status; make sure not to mask
+    // the original error status.
+    if (!instance_status.ok() && (status_.ok() || status_.IsCancelled())) {
+      status_ = instance_status;
+      failed_instance_id_ = instance_exec_status.fragment_instance_id;
+      is_fragment_failure_ = true;
     }
     DCHECK_GT(num_remaining_instances_, 0);
     if (instance_exec_status.done) {
@@ -454,7 +453,6 @@ void Coordinator::BackendState::InstanceStats::InitCounters() {
 void Coordinator::BackendState::InstanceStats::Update(
     const TFragmentInstanceExecStatus& exec_status,
     ExecSummary* exec_summary, ProgressUpdater* scan_range_progress) {
-  DCHECK(Status(exec_status.status).ok());
   if (exec_status.done) stopwatch_.Stop();
   profile_->Update(exec_status.profile);
   if (!profile_created_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/f3fa3e01/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 3b168ca..ae207a2 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -227,10 +227,9 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
     status.SetTStatus(&instance_status);
     instance_status.__set_done(done);
 
-    if (fis->profile() != nullptr) {
-      fis->profile()->ToThrift(&instance_status.profile);
-      instance_status.__isset.profile = true;
-    }
+    DCHECK(fis->profile() != nullptr);
+    fis->profile()->ToThrift(&instance_status.profile);
+    instance_status.__isset.profile = true;
 
     // Only send updates to insert status if fragment is finished, the coordinator waits
     // until query execution is done to use them anyhow.

http://git-wip-us.apache.org/repos/asf/impala/blob/f3fa3e01/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index cf0527b..ee7177d 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -108,3 +108,22 @@ class TestObservability(ImpalaTestSuite):
     assert "Query Options (set by configuration and planner): MEM_LIMIT=8589934592," \
         "NUM_NODES=1,NUM_SCANNER_THREADS=1,RUNTIME_FILTER_MODE=0,MT_DOP=0\n" \
         in runtime_profile
+
+  def test_profile_fragment_instances(self):
+    """IMPALA-6081: Test that the expected number of fragment instances and their exec
+    nodes appear in the runtime profile, even when fragments may be quickly cancelled when
+    all results are already returned."""
+    results = self.execute_query("""
+        with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
+        select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
+        join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;""")
+    # There are 3 scan nodes and each appears in the profile 4 times (for 3 fragment
+    # instances + the averaged fragment).
+    assert results.runtime_profile.count("HDFS_SCAN_NODE") == 12
+    # There are 3 exchange nodes and each appears in the profile 2 times (for 1 fragment
+    # instance + the averaged fragment).
+    assert results.runtime_profile.count("EXCHANGE_NODE") == 6
+    # The following appear only in the root fragment which has 1 instance.
+    assert results.runtime_profile.count("HASH_JOIN_NODE") == 2
+    assert results.runtime_profile.count("AGGREGATION_NODE") == 2
+    assert results.runtime_profile.count("PLAN_ROOT_SINK") == 2