You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2017/12/07 17:49:53 UTC

[4/4] impala git commit: IMPALA-4671: (part-1) Copy kudu::ServicePool into Impala namespace

IMPALA-4671: (part-1) Copy kudu::ServicePool into Impala namespace

*** THIS PATCH IS NOT FOR REVIEW. IT IS JUST UPLOADED TO MAKE THE
REVIEW OF PART-2 EASIER IN GERRIT BY SHOWING ONLY THE DIFFS OF THAT
PATCH AGAINST THIS PATCH. MORE EXPLANATION BELOW***

The KuduRPC subsystem uses kudu::ServicePool to service all incoming
RPCs. Since this lives inside the Kudu codebase, all instrumentation
of RPCs flowing through the KuduRPC subsystem is not visible to Impala,
as Kudu does not export this instrumentation, but only maintains it
internally within kudu::ServicePool.

In order to reliably view the instrumentation of all RPCs flowing through
KRPC, one option is to modify kudu::ServicePool to take their
instrumentation and display it on our webpages, which is very invasive.

A second option is to have a parallel implementation of kudu::ServicePool
which for all purposes behaves the same way, but instead has this extra
code that displays this instrumentation in our webpages.

This patch is Part 1 of the second option. In this patch, we simply copy
the kudu::ServicePool code into be/src/rpc/.

In the next patch (Part 2), we rename it to ImpalaServicePool,
and we do the instrumentation such that it shows up on the
webpage.

We split it up this way into 2 parts so that it will be easy to review.

Change-Id: I5bb927d4726d4fe418251b9734a4e232810fef69
Reviewed-on: http://gerrit.cloudera.org:8080/8471
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Sailesh Mukil <sa...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a94d6068
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a94d6068
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a94d6068

Branch: refs/heads/master
Commit: a94d6068c757912eaa741db82331ef72e5696004
Parents: a4916e6
Author: Sailesh Mukil <sa...@apache.org>
Authored: Fri Nov 3 11:11:53 2017 -0700
Committer: Sailesh Mukil <sa...@cloudera.com>
Committed: Thu Dec 7 14:49:45 2017 +0000

----------------------------------------------------------------------
 be/src/rpc/impala-service-pool.cc | 219 +++++++++++++++++++++++++++++++++
 be/src/rpc/impala-service-pool.h  |  98 +++++++++++++++
 2 files changed, 317 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a94d6068/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
new file mode 100644
index 0000000..1a23ca9
--- /dev/null
+++ b/be/src/rpc/impala-service-pool.cc
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/service_pool.h"
+
+#include <glog/logging.h>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/service_queue.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+
+using std::shared_ptr;
+using strings::Substitute;
+
+METRIC_DEFINE_histogram(server, rpc_incoming_queue_time,
+                        "RPC Queue Time",
+                        kudu::MetricUnit::kMicroseconds,
+                        "Number of microseconds incoming RPC requests spend in the worker queue",
+                        60000000LU, 3);
+
+METRIC_DEFINE_counter(server, rpcs_timed_out_in_queue,
+                      "RPC Queue Timeouts",
+                      kudu::MetricUnit::kRequests,
+                      "Number of RPCs whose timeout elapsed while waiting "
+                      "in the service queue, and thus were not processed.");
+
+METRIC_DEFINE_counter(server, rpcs_queue_overflow,
+                      "RPC Queue Overflows",
+                      kudu::MetricUnit::kRequests,
+                      "Number of RPCs dropped because the service queue "
+                      "was full.");
+
+namespace kudu {
+namespace rpc {
+
+ServicePool::ServicePool(gscoped_ptr<ServiceIf> service,
+                         const scoped_refptr<MetricEntity>& entity,
+                         size_t service_queue_length)
+  : service_(std::move(service)),
+    service_queue_(service_queue_length),
+    incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)),
+    rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)),
+    rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)),
+    closing_(false) {
+}
+
+ServicePool::~ServicePool() {
+  Shutdown();
+}
+
+Status ServicePool::Init(int num_threads) {
+  for (int i = 0; i < num_threads; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    CHECK_OK(kudu::Thread::Create("service pool", "rpc worker",
+        &ServicePool::RunThread, this, &new_thread));
+    threads_.push_back(new_thread);
+  }
+  return Status::OK();
+}
+
+void ServicePool::Shutdown() {
+  service_queue_.Shutdown();
+
+  MutexLock lock(shutdown_lock_);
+  if (closing_) return;
+  closing_ = true;
+  // TODO: Use a proper thread pool implementation.
+  for (scoped_refptr<kudu::Thread>& thread : threads_) {
+    CHECK_OK(ThreadJoiner(thread.get()).Join());
+  }
+
+  // Now we must drain the service queue.
+  Status status = Status::ServiceUnavailable("Service is shutting down");
+  std::unique_ptr<InboundCall> incoming;
+  while (service_queue_.BlockingGet(&incoming)) {
+    incoming.release()->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+  }
+
+  service_->Shutdown();
+}
+
+void ServicePool::RejectTooBusy(InboundCall* c) {
+  string err_msg =
+      Substitute("$0 request on $1 from $2 dropped due to backpressure. "
+                 "The service queue is full; it has $3 items.",
+                 c->remote_method().method_name(),
+                 service_->service_name(),
+                 c->remote_address().ToString(),
+                 service_queue_.max_size());
+  rpcs_queue_overflow_->Increment();
+  KLOG_EVERY_N_SECS(WARNING, 1) << err_msg;
+  c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+                    Status::ServiceUnavailable(err_msg));
+  DLOG(INFO) << err_msg << " Contents of service queue:\n"
+             << service_queue_.ToString();
+}
+
+RpcMethodInfo* ServicePool::LookupMethod(const RemoteMethod& method) {
+  return service_->LookupMethod(method);
+}
+
+Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) {
+  InboundCall* c = call.release();
+
+  vector<uint32_t> unsupported_features;
+  for (uint32_t feature : c->GetRequiredFeatures()) {
+    if (!service_->SupportsFeature(feature)) {
+      unsupported_features.push_back(feature);
+    }
+  }
+
+  if (!unsupported_features.empty()) {
+    c->RespondUnsupportedFeature(unsupported_features);
+    return Status::NotSupported("call requires unsupported application feature flags",
+                                JoinMapped(unsupported_features,
+                                           [] (uint32_t flag) { return std::to_string(flag); },
+                                           ", "));
+  }
+
+  TRACE_TO(c->trace(), "Inserting onto call queue");
+
+  // Queue message on service queue
+  boost::optional<InboundCall*> evicted;
+  auto queue_status = service_queue_.Put(c, &evicted);
+  if (queue_status == QUEUE_FULL) {
+    RejectTooBusy(c);
+    return Status::OK();
+  }
+
+  if (PREDICT_FALSE(evicted != boost::none)) {
+    RejectTooBusy(*evicted);
+  }
+
+  if (PREDICT_TRUE(queue_status == QUEUE_SUCCESS)) {
+    // NB: do not do anything with 'c' after it is successfully queued --
+    // a service thread may have already dequeued it, processed it, and
+    // responded by this point, in which case the pointer would be invalid.
+    return Status::OK();
+  }
+
+  Status status = Status::OK();
+  if (queue_status == QUEUE_SHUTDOWN) {
+    status = Status::ServiceUnavailable("Service is shutting down");
+    c->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+  } else {
+    status = Status::RuntimeError(Substitute("Unknown error from BlockingQueue: $0", queue_status));
+    c->RespondFailure(ErrorStatusPB::FATAL_UNKNOWN, status);
+  }
+  return status;
+}
+
+void ServicePool::RunThread() {
+  while (true) {
+    std::unique_ptr<InboundCall> incoming;
+    if (!service_queue_.BlockingGet(&incoming)) {
+      VLOG(1) << "ServicePool: messenger shutting down.";
+      return;
+    }
+
+    incoming->RecordHandlingStarted(incoming_queue_time_);
+    ADOPT_TRACE(incoming->trace());
+
+    if (PREDICT_FALSE(incoming->ClientTimedOut())) {
+      TRACE_TO(incoming->trace(), "Skipping call since client already timed out");
+      rpcs_timed_out_in_queue_->Increment();
+
+      // Respond as a failure, even though the client will probably ignore
+      // the response anyway.
+      incoming->RespondFailure(
+        ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+        Status::TimedOut("Call waited in the queue past client deadline"));
+
+      // Must release since RespondFailure above ends up taking ownership
+      // of the object.
+      ignore_result(incoming.release());
+      continue;
+    }
+
+    TRACE_TO(incoming->trace(), "Handling call");
+
+    // Release the InboundCall pointer -- when the call is responded to,
+    // it will get deleted at that point.
+    service_->Handle(incoming.release());
+  }
+}
+
+const string ServicePool::service_name() const {
+  return service_->service_name();
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/a94d6068/be/src/rpc/impala-service-pool.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
new file mode 100644
index 0000000..70611c8
--- /dev/null
+++ b/be/src/rpc/impala-service-pool.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_SERVICE_POOL_H
+#define KUDU_SERVICE_POOL_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/service_queue.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Counter;
+class Histogram;
+class MetricEntity;
+class Socket;
+
+namespace rpc {
+
+class Messenger;
+class ServiceIf;
+
+// A pool of threads that handle new incoming RPC calls.
+// Also includes a queue that calls get pushed onto for handling by the pool.
+class ServicePool : public RpcService {
+ public:
+  ServicePool(gscoped_ptr<ServiceIf> service,
+              const scoped_refptr<MetricEntity>& metric_entity,
+              size_t service_queue_length);
+  virtual ~ServicePool();
+
+  // Start up the thread pool.
+  virtual Status Init(int num_threads);
+
+  // Shut down the queue and the thread pool.
+  virtual void Shutdown();
+
+  RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
+
+  virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) OVERRIDE;
+
+  const Counter* RpcsTimedOutInQueueMetricForTests() const {
+    return rpcs_timed_out_in_queue_.get();
+  }
+
+  const Histogram* IncomingQueueTimeMetricForTests() const {
+    return incoming_queue_time_.get();
+  }
+
+  const Counter* RpcsQueueOverflowMetric() const {
+    return rpcs_queue_overflow_.get();
+  }
+
+  const std::string service_name() const;
+
+ private:
+  void RunThread();
+  void RejectTooBusy(InboundCall* c);
+
+  gscoped_ptr<ServiceIf> service_;
+  std::vector<scoped_refptr<kudu::Thread> > threads_;
+  LifoServiceQueue service_queue_;
+  scoped_refptr<Histogram> incoming_queue_time_;
+  scoped_refptr<Counter> rpcs_timed_out_in_queue_;
+  scoped_refptr<Counter> rpcs_queue_overflow_;
+
+  mutable Mutex shutdown_lock_;
+  bool closing_;
+
+  DISALLOW_COPY_AND_ASSIGN(ServicePool);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif