You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/08/07 03:53:42 UTC

[impala] branch master updated (dbbd403 -> bbec044)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from dbbd403  IMPALA-10005: Fix Snappy decompression for non-block filesystems
     new 86b70e9  IMPALA-9851: Truncate long error message.
     new 7a6469e  IMPALA-10053: Remove uses of MonoTime::GetDeltaSince()
     new bbec044  IMPALA-10044: Fix cleanup for bootstrap_toolchain.py failure case

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/bufferpool/buffer-pool-internal.h |   3 +
 be/src/runtime/bufferpool/buffer-pool-test.cc    |  54 +++++++++++
 be/src/runtime/bufferpool/buffer-pool.cc         |  24 +++--
 be/src/runtime/bufferpool/buffer-pool.h          |   1 +
 be/src/runtime/krpc-data-stream-recvr.cc         |   3 +-
 be/src/runtime/krpc-data-stream-sender.cc        |   4 +
 be/src/service/data-stream-service.cc            |   2 +-
 be/src/util/error-util-test.cc                   |   7 ++
 be/src/util/error-util.cc                        | 112 ++++++++++++-----------
 be/src/util/error-util.h                         |  11 ++-
 be/src/util/internal-queue.h                     |  13 +++
 bin/bootstrap_toolchain.py                       |   3 +-
 12 files changed, 172 insertions(+), 65 deletions(-)


[impala] 01/03: IMPALA-9851: Truncate long error message.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 86b70e9850cce0b45194a64cd89ae21df0e82029
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Aug 5 17:03:08 2020 -0700

    IMPALA-9851: Truncate long error message.
    
    Error message length was unbounded and can grow very large into couple
    of MB in size. This patch truncate error message to maximum 128kb in
    size.
    
    This patch also fix potentially long error message related to
    BufferPool::Client::DebugString(). Before this patch, DebugString() will
    print all pages in 'pinned_pages_', 'dirty_unpinned_pages_', and
    'in_flight_write_pages_' PageList. With this patch, DebugString() only
    include maximum of 100 first pages in each PageList.
    
    Testing:
    - Add be test BufferPoolTest.ShortDebugString
    - Add test within ErrorMsg.GenericFormatting to test for truncation.
    - Run and pass core tests.
    
    Change-Id: Ic9fa4d024fb3dc9de03c7484f41b5e420a710e5a
    Reviewed-on: http://gerrit.cloudera.org:8080/16300
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/bufferpool/buffer-pool-internal.h |   3 +
 be/src/runtime/bufferpool/buffer-pool-test.cc    |  54 +++++++++++
 be/src/runtime/bufferpool/buffer-pool.cc         |  24 +++--
 be/src/runtime/bufferpool/buffer-pool.h          |   1 +
 be/src/util/error-util-test.cc                   |   7 ++
 be/src/util/error-util.cc                        | 112 ++++++++++++-----------
 be/src/util/error-util.h                         |  11 ++-
 be/src/util/internal-queue.h                     |  13 +++
 8 files changed, 163 insertions(+), 62 deletions(-)

diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index c2caf7b..20d7767 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -182,6 +182,9 @@ class BufferPool::PageList {
   }
 
   void Iterate(boost::function<bool(Page*)> fn) { list_.Iterate(fn); }
+  void IterateFirstN(boost::function<bool(Page*)> fn, int n) {
+    list_.IterateFirstN(fn, n);
+  }
   bool Contains(Page* page) { return list_.Contains(page); }
   Page* tail() { return list_.tail(); }
   bool empty() const { return list_.empty(); }
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 2c9add7..611963c 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -2353,6 +2353,60 @@ TEST_F(BufferPoolTest, BufferPoolGc) {
   buffer_pool->FreeBuffer(&client, &buffer);
   buffer_pool->DeregisterClient(&client);
 }
+
+/// IMPALA-9851: Cap the number of pages that can be printed at
+/// BufferPool::Client::DebugString().
+TEST_F(BufferPoolTest, ShortDebugString) {
+  // Allocate pages more than BufferPool::MAX_PAGE_ITER_DEBUG.
+  int num_pages = 105;
+  int64_t max_page_len = TEST_BUFFER_LEN;
+  int64_t total_mem = num_pages * max_page_len;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, total_mem, total_mem);
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
+      total_mem, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(total_mem));
+
+  vector<BufferPool::PageHandle> handles(num_pages);
+
+  // Create pages of various valid sizes.
+  for (int i = 0; i < num_pages; ++i) {
+    int64_t page_len = TEST_BUFFER_LEN;
+    int64_t used_before = client.GetUsedReservation();
+    ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
+    ASSERT_TRUE(handles[i].is_open());
+    ASSERT_TRUE(handles[i].is_pinned());
+    const BufferHandle* buffer;
+    ASSERT_OK(handles[i].GetBuffer(&buffer));
+    ASSERT_TRUE(buffer->data() != NULL);
+    ASSERT_EQ(handles[i].len(), page_len);
+    ASSERT_EQ(buffer->len(), page_len);
+    ASSERT_EQ(client.GetUsedReservation(), used_before + page_len);
+  }
+
+  // Verify that only subset of pages are included in DebugString().
+  string page_count_substr = Substitute(
+      "$0 out of $1 pinned pages:", BufferPool::MAX_PAGE_ITER_DEBUG, num_pages);
+  string debug_string = client.DebugString();
+  ASSERT_NE(debug_string.find(page_count_substr), string::npos)
+      << page_count_substr << " not found at BufferPool::Client::DebugString(). "
+      << debug_string;
+
+  // Close the handles and check memory consumption.
+  for (int i = 0; i < num_pages; ++i) {
+    int64_t used_before = client.GetUsedReservation();
+    int page_len = handles[i].len();
+    pool.DestroyPage(&client, &handles[i]);
+    ASSERT_EQ(client.GetUsedReservation(), used_before - page_len);
+  }
+
+  pool.DeregisterClient(&client);
+
+  // All the reservations should be released at this point.
+  ASSERT_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
 } // namespace impala
 
 int main(int argc, char** argv) {
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 1c900cb..687f937 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -42,6 +42,7 @@ namespace impala {
 
 constexpr int BufferPool::LOG_MAX_BUFFER_BYTES;
 constexpr int64_t BufferPool::MAX_BUFFER_BYTES;
+constexpr int BufferPool::MAX_PAGE_ITER_DEBUG;
 
 void BufferPool::BufferHandle::Open(uint8_t* data, int64_t len, int home_core) {
   DCHECK_LE(0, home_core);
@@ -808,12 +809,23 @@ string BufferPool::Client::DebugStringLocked() {
       this, name_, write_status_.GetDetail(), buffers_allocated_bytes_, num_pages_,
       pinned_pages_.bytes(), dirty_unpinned_pages_.bytes(),
       in_flight_write_pages_.bytes(), reservation_.DebugString());
-  ss << "\n  " << pinned_pages_.size() << " pinned pages: ";
-  pinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
-  ss << "\n  " << dirty_unpinned_pages_.size() << " dirty unpinned pages: ";
-  dirty_unpinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
-  ss << "\n  " << in_flight_write_pages_.size() << " in flight write pages: ";
-  in_flight_write_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
+  int page_to_print = min(pinned_pages_.size(), BufferPool::MAX_PAGE_ITER_DEBUG);
+  ss << "\n  " << page_to_print << " out of " << pinned_pages_.size()
+     << " pinned pages: ";
+  pinned_pages_.IterateFirstN(
+      bind<bool>(Page::DebugStringCallback, &ss, _1), page_to_print);
+
+  page_to_print = min(dirty_unpinned_pages_.size(), MAX_PAGE_ITER_DEBUG);
+  ss << "\n  " << page_to_print << " out of " << dirty_unpinned_pages_.size()
+     << " dirty unpinned pages: ";
+  dirty_unpinned_pages_.IterateFirstN(
+      bind<bool>(Page::DebugStringCallback, &ss, _1), page_to_print);
+
+  page_to_print = min(in_flight_write_pages_.size(), MAX_PAGE_ITER_DEBUG);
+  ss << "\n  " << page_to_print << " out of " << in_flight_write_pages_.size()
+     << " in flight write pages: ";
+  in_flight_write_pages_.IterateFirstN(
+      bind<bool>(Page::DebugStringCallback, &ss, _1), page_to_print);
   return ss.str();
 }
 
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 6c81b2a..04fcdbe 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -302,6 +302,7 @@ class BufferPool : public CacheLineAligned {
   /// power-of-two buffer sizes.
   static constexpr int LOG_MAX_BUFFER_BYTES = 48;
   static constexpr int64_t MAX_BUFFER_BYTES = 1L << LOG_MAX_BUFFER_BYTES;
+  static constexpr int MAX_PAGE_ITER_DEBUG = 100;
 
  protected:
   friend class BufferPoolTest;
diff --git a/be/src/util/error-util-test.cc b/be/src/util/error-util-test.cc
index deec5d4..6b8c0ef 100644
--- a/be/src/util/error-util-test.cc
+++ b/be/src/util/error-util-test.cc
@@ -27,6 +27,8 @@
 
 namespace impala {
 
+constexpr int ErrorMsg::MAX_ERROR_MESSAGE_LEN;
+
 TEST(ErrorMsg, GenericFormatting) {
   ErrorMsg msg(TErrorCode::GENERAL, "This is a test");
   ASSERT_EQ("This is a test", msg.msg());
@@ -39,6 +41,11 @@ TEST(ErrorMsg, GenericFormatting) {
   msg = ErrorMsg(TErrorCode::MISSING_BUILTIN, "fun", "sym");
   ASSERT_EQ("Builtin 'fun' with symbol 'sym' does not exist. Verify that "
       "all your impalads are the same version.", msg.msg());
+
+  // Test long error message and truncation.
+  string long_msg = std::string(256 * 1024, '-'); // 256kb string
+  msg = ErrorMsg(TErrorCode::GENERAL, long_msg);
+  ASSERT_EQ(ErrorMsg::MAX_ERROR_MESSAGE_LEN, msg.msg().size());
 }
 
 TEST(ErrorMsg, MergeMap) {
diff --git a/be/src/util/error-util.cc b/be/src/util/error-util.cc
index 27fde1f..430f2c8 100644
--- a/be/src/util/error-util.cc
+++ b/be/src/util/error-util.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "util/error-util-internal.h"
+#include "util/string-util.h"
 
 #include <errno.h>
 #include <string.h>
@@ -55,74 +56,79 @@ string GetTablesMissingStatsWarning(const vector<TTableName>& tables_missing_sta
   return ss.str();
 }
 
-ErrorMsg::ErrorMsg(TErrorCode::type error)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_])) {}
+ErrorMsg::ErrorMsg(TErrorCode::type error) : error_(error) {
+  SetErrorMsg(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_]));
+}
 
-ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0)) {}
+ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0) : error_(error) {
+  SetErrorMsg(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_], arg0));
+}
 
 ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0, arg1)) {}
+  : error_(error) {
+  SetErrorMsg(
+      strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_], arg0, arg1));
+}
 
-ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0, arg1, arg2)) {}
+ErrorMsg::ErrorMsg(
+    TErrorCode::type error, const ArgType& arg0, const ArgType& arg1, const ArgType& arg2)
+  : error_(error) {
+  SetErrorMsg(strings::Substitute(
+      g_ErrorCodes_constants.TErrorMessage[error_], arg0, arg1, arg2));
+}
 
 ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
     const ArgType& arg2, const ArgType& arg3)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0, arg1, arg2, arg3)) {}
+  : error_(error) {
+  SetErrorMsg(strings::Substitute(
+      g_ErrorCodes_constants.TErrorMessage[error_], arg0, arg1, arg2, arg3));
+}
 
 ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
     const ArgType& arg2, const ArgType& arg3, const ArgType& arg4)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0, arg1, arg2, arg3, arg4)) {}
+  : error_(error) {
+  SetErrorMsg(strings::Substitute(
+      g_ErrorCodes_constants.TErrorMessage[error_], arg0, arg1, arg2, arg3, arg4));
+}
 
 ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-    const ArgType& arg5)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0, arg1, arg2, arg3, arg4, arg5)) {}
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5)
+  : error_(error) {
+  SetErrorMsg(strings::Substitute(
+      g_ErrorCodes_constants.TErrorMessage[error_], arg0, arg1, arg2, arg3, arg4, arg5));
+}
 
 ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-    const ArgType& arg5, const ArgType& arg6)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0, arg1, arg2, arg3, arg4, arg5, arg6)) {}
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+    const ArgType& arg6)
+  : error_(error) {
+  SetErrorMsg(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_], arg0,
+      arg1, arg2, arg3, arg4, arg5, arg6));
+}
 
 ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-    const ArgType& arg5, const ArgType& arg6, const ArgType& arg7)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7)) {}
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+    const ArgType& arg6, const ArgType& arg7)
+  : error_(error) {
+  SetErrorMsg(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_], arg0,
+      arg1, arg2, arg3, arg4, arg5, arg6, arg7));
+}
 
 ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-    const ArgType& arg5, const ArgType& arg6, const ArgType& arg7,
-    const ArgType& arg8)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8)) {}
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+    const ArgType& arg6, const ArgType& arg7, const ArgType& arg8)
+  : error_(error) {
+  SetErrorMsg(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_], arg0,
+      arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8));
+}
 
 ErrorMsg::ErrorMsg(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-    const ArgType& arg5, const ArgType& arg6, const ArgType& arg7,
-    const ArgType& arg8, const ArgType& arg9)
-    : error_(error),
-      message_(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_],
-              arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9)) {}
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+    const ArgType& arg6, const ArgType& arg7, const ArgType& arg8, const ArgType& arg9)
+  : error_(error) {
+  SetErrorMsg(strings::Substitute(g_ErrorCodes_constants.TErrorMessage[error_], arg0,
+      arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9));
+}
 
 ErrorMsg ErrorMsg::Init(TErrorCode::type error, const ArgType& arg0,
     const ArgType& arg1, const ArgType& arg2, const ArgType& arg3,
@@ -132,9 +138,8 @@ ErrorMsg ErrorMsg::Init(TErrorCode::type error, const ArgType& arg0,
   ErrorCodesConstants error_strings;
   ErrorMsg m;
   m.error_ = error;
-  m.message_ = strings::Substitute(error_strings.TErrorMessage[m.error_],
-      arg0, arg1, arg2, arg3, arg4, arg5,
-      arg6, arg7, arg8, arg9);
+  m.SetErrorMsg(strings::Substitute(error_strings.TErrorMessage[m.error_], arg0, arg1,
+      arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9));
   return m;
 }
 
@@ -217,6 +222,11 @@ size_t ErrorCount(const ErrorLogMap& errors) {
   return errors.size() + cit->second.messages_size() - 1;
 }
 
+void ErrorMsg::SetErrorMsg(const std::string& msg) {
+  Status status = TruncateDown(msg, ErrorMsg::MAX_ERROR_MESSAGE_LEN, &message_);
+  DCHECK_OK(status);
+}
+
 string ErrorMsg::GetFullMessageDetails() const {
   stringstream ss;
   ss << message_ << "\n";
diff --git a/be/src/util/error-util.h b/be/src/util/error-util.h
index 2b0e280..355376e 100644
--- a/be/src/util/error-util.h
+++ b/be/src/util/error-util.h
@@ -52,6 +52,8 @@ std::string GetTablesMissingStatsWarning(
 /// as strings to the message. These details should only be accessed internally.
 class ErrorMsg {
  public:
+  static constexpr int MAX_ERROR_MESSAGE_LEN = 128 * 1024; // 128kb
+
   typedef strings::internal::SubstituteArg ArgType;
 
   /// Trivial constructor.
@@ -117,11 +119,6 @@ class ErrorMsg {
     error_ = e;
   }
 
-  /// Set a specific error message.
-  void SetErrorMsg(const std::string& msg) {
-    message_ = msg;
-  }
-
   /// Return the formatted error string.
   const std::string& msg() const {
     return message_;
@@ -131,6 +128,10 @@ class ErrorMsg {
     return details_;
   }
 
+  /// Set a specific error message. Truncate the message if the length is longer than
+  /// MAX_ERROR_MESSAGE_LEN.
+  void SetErrorMsg(const std::string& msg);
+
   /// Produce a string representation of the error message that includes the formatted
   /// message of the original error and the attached detail strings.
   std::string GetFullMessageDetails() const;
diff --git a/be/src/util/internal-queue.h b/be/src/util/internal-queue.h
index 85a46e1..3430b02 100644
--- a/be/src/util/internal-queue.h
+++ b/be/src/util/internal-queue.h
@@ -274,6 +274,19 @@ class InternalQueueBase {
     }
   }
 
+  // Iterate over first 'n' elements of queue, calling 'fn' for each element. If 'n' is
+  // larger than the size of the queue, iteration will finish after the last element
+  // reached. If 'fn' returns false, terminate iteration. It is invalid to call other
+  // InternalQueue methods from 'fn'.
+  void IterateFirstN(boost::function<bool(T*)> fn, int n) {
+    std::lock_guard<LockType> lock(lock_);
+    for (Node* current = head_; (current != nullptr) && (n > 0);
+         current = current->next) {
+      if (!fn(reinterpret_cast<T*>(current))) return;
+      n--;
+    }
+  }
+
   /// Prints the queue ptrs to a string.
   std::string DebugString() {
     std::stringstream ss;


[impala] 02/03: IMPALA-10053: Remove uses of MonoTime::GetDeltaSince()

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7a6469e44486191cd344e9f7dcf681763d6091db
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Wed Aug 5 16:57:56 2020 -0700

    IMPALA-10053: Remove uses of MonoTime::GetDeltaSince()
    
    MonoTime is a utility Impala imports from Kudu. The behavior of
    MonoTime::GetDeltaSince() was accidentally flipped in
    https://gerrit.cloudera.org/#/c/14932/ so we're getting negative
    durations where we expect positive durations.
    
    The function is deprecated anyways, so this patch removes all uses of
    it and replaces them with the MonoTime '-' operator.
    
    Testing:
    - Manually ran with and without patch and inspected calculated values.
    - Added DCHECKs to prevent sucn an issue from occurring again.
    
    Change-Id: If8cd3eb51a4fd101bbe4b9c44ea9be6ea2ea0d06
    Reviewed-on: http://gerrit.cloudera.org:8080/16296
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/krpc-data-stream-recvr.cc  | 3 ++-
 be/src/runtime/krpc-data-stream-sender.cc | 4 ++++
 be/src/service/data-stream-service.cc     | 2 +-
 3 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 43a13e4..97aa406 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -749,7 +749,8 @@ Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
 
 void KrpcDataStreamRecvr::AddBatch(const TransmitDataRequestPB* request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
-  MonoDelta duration(MonoTime::Now().GetDeltaSince(rpc_context->GetTimeReceived()));
+  MonoDelta duration(MonoTime::Now() - rpc_context->GetTimeReceived());
+  DCHECK_GE(duration.ToNanoseconds(), 0);
   dispatch_timer_->UpdateCounter(duration.ToNanoseconds());
   int use_sender_id = is_merging_ ? request->sender_id() : 0;
   // Add all batches to the same queue if is_merging_ is false.
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index b795310..9a0f28e 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -496,6 +496,8 @@ void KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
   const kudu::Status controller_status = rpc_controller_.status();
   if (LIKELY(controller_status.ok())) {
     DCHECK(rpc_in_flight_batch_ != nullptr);
+    // 'receiver_latency_ns' is calculated with MonoTime, so it must be non-negative.
+    DCHECK_GE(resp_.receiver_latency_ns(), 0);
     int64_t row_batch_size = RowBatch::GetSerializedSize(*rpc_in_flight_batch_);
     int64_t network_time = total_time - resp_.receiver_latency_ns();
     COUNTER_ADD(parent_->bytes_sent_counter_, row_batch_size);
@@ -628,6 +630,8 @@ void KrpcDataStreamSender::Channel::EndDataStreamCompleteCb() {
   int64_t total_time_ns = MonotonicNanos() - rpc_start_time_ns_;
   const kudu::Status controller_status = rpc_controller_.status();
   if (LIKELY(controller_status.ok())) {
+    // 'receiver_latency_ns' is calculated with MonoTime, so it must be non-negative.
+    DCHECK_GE(resp_.receiver_latency_ns(), 0);
     int64_t network_time_ns = total_time_ns - resp_.receiver_latency_ns();
     parent_->network_time_stats_->UpdateCounter(network_time_ns);
     parent_->recvr_time_stats_->UpdateCounter(eos_resp_.receiver_latency_ns());
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 76ef7ba..ceea1fa 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -143,7 +143,7 @@ void DataStreamService::PublishFilter(
 template<typename ResponsePBType>
 void DataStreamService::RespondRpc(const Status& status,
     ResponsePBType* response, kudu::rpc::RpcContext* ctx) {
-  MonoDelta duration(MonoTime::Now().GetDeltaSince(ctx->GetTimeReceived()));
+  MonoDelta duration(MonoTime::Now() - ctx->GetTimeReceived());
   status.ToProto(response->mutable_status());
   response->set_receiver_latency_ns(duration.ToNanoseconds());
   ctx->RespondSuccess();


[impala] 03/03: IMPALA-10044: Fix cleanup for bootstrap_toolchain.py failure case

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bbec0443fcdabf5de6f7ae0e47595414503f30f0
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Aug 5 14:02:30 2020 -0700

    IMPALA-10044: Fix cleanup for bootstrap_toolchain.py failure case
    
    If DownloadUnpackTarball::download()'s wget_and_unpack_package call
    hits an exception, the exception handler cleans up any created
    directories. Currently, it erroneously cleans up the directory where
    the tarballs are downloaded even when it is not a temporary directory.
    This would delete the entire toolchain.
    
    This fixes the cleanup to only delete that directory if it is a
    temporary directory.
    
    Testing:
     - Simulated exception from wget_and_unpack_package and verified
       behavior.
    
    Change-Id: Ia57f56b6717635af94247fce50b955c07a57d113
    Reviewed-on: http://gerrit.cloudera.org:8080/16294
    Reviewed-by: Laszlo Gaal <la...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/bootstrap_toolchain.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index 647fc00..5d59da1 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -182,7 +182,8 @@ class DownloadUnpackTarball(object):
       # Clean up any partially-unpacked result.
       if os.path.isdir(unpack_dir):
         shutil.rmtree(unpack_dir)
-      if os.path.isdir(download_dir):
+      # Only delete the download directory if it is a temporary directory
+      if download_dir != self.destination_basedir and os.path.isdir(download_dir):
         shutil.rmtree(download_dir)
       raise
     if self.makedir: