You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/08/07 03:53:44 UTC

[impala] 02/03: IMPALA-10053: Remove uses of MonoTime::GetDeltaSince()

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7a6469e44486191cd344e9f7dcf681763d6091db
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Wed Aug 5 16:57:56 2020 -0700

    IMPALA-10053: Remove uses of MonoTime::GetDeltaSince()
    
    MonoTime is a utility Impala imports from Kudu. The behavior of
    MonoTime::GetDeltaSince() was accidentally flipped in
    https://gerrit.cloudera.org/#/c/14932/ so we're getting negative
    durations where we expect positive durations.
    
    The function is deprecated anyways, so this patch removes all uses of
    it and replaces them with the MonoTime '-' operator.
    
    Testing:
    - Manually ran with and without patch and inspected calculated values.
    - Added DCHECKs to prevent sucn an issue from occurring again.
    
    Change-Id: If8cd3eb51a4fd101bbe4b9c44ea9be6ea2ea0d06
    Reviewed-on: http://gerrit.cloudera.org:8080/16296
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/krpc-data-stream-recvr.cc  | 3 ++-
 be/src/runtime/krpc-data-stream-sender.cc | 4 ++++
 be/src/service/data-stream-service.cc     | 2 +-
 3 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 43a13e4..97aa406 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -749,7 +749,8 @@ Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
 
 void KrpcDataStreamRecvr::AddBatch(const TransmitDataRequestPB* request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
-  MonoDelta duration(MonoTime::Now().GetDeltaSince(rpc_context->GetTimeReceived()));
+  MonoDelta duration(MonoTime::Now() - rpc_context->GetTimeReceived());
+  DCHECK_GE(duration.ToNanoseconds(), 0);
   dispatch_timer_->UpdateCounter(duration.ToNanoseconds());
   int use_sender_id = is_merging_ ? request->sender_id() : 0;
   // Add all batches to the same queue if is_merging_ is false.
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index b795310..9a0f28e 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -496,6 +496,8 @@ void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
   const kudu::Status controller_status = rpc_controller_.status();
   if (LIKELY(controller_status.ok())) {
     DCHECK(rpc_in_flight_batch_ != nullptr);
+    // 'receiver_latency_ns' is calculated with MonoTime, so it must be non-negative.
+    DCHECK_GE(resp_.receiver_latency_ns(), 0);
     int64_t row_batch_size = RowBatch::GetSerializedSize(*rpc_in_flight_batch_);
     int64_t network_time = total_time - resp_.receiver_latency_ns();
     COUNTER_ADD(parent_->bytes_sent_counter_, row_batch_size);
@@ -628,6 +630,8 @@ void KrpcDataStreamSender::Channel::EndDataStreamCompleteCb() {
   int64_t total_time_ns = MonotonicNanos() - rpc_start_time_ns_;
   const kudu::Status controller_status = rpc_controller_.status();
   if (LIKELY(controller_status.ok())) {
+    // 'receiver_latency_ns' is calculated with MonoTime, so it must be non-negative.
+    DCHECK_GE(resp_.receiver_latency_ns(), 0);
     int64_t network_time_ns = total_time_ns - resp_.receiver_latency_ns();
     parent_->network_time_stats_->UpdateCounter(network_time_ns);
     parent_->recvr_time_stats_->UpdateCounter(eos_resp_.receiver_latency_ns());
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 76ef7ba..ceea1fa 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -143,7 +143,7 @@ void DataStreamService::PublishFilter(
 template<typename ResponsePBType>
 void DataStreamService::RespondRpc(const Status& status,
     ResponsePBType* response, kudu::rpc::RpcContext* ctx) {
-  MonoDelta duration(MonoTime::Now().GetDeltaSince(ctx->GetTimeReceived()));
+  MonoDelta duration(MonoTime::Now() - ctx->GetTimeReceived());
   status.ToProto(response->mutable_status());
   response->set_receiver_latency_ns(duration.ToNanoseconds());
   ctx->RespondSuccess();