You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/09/19 18:28:31 UTC

[1/2] kudu git commit: Change timestamp printing to ISO-8601 (with microseconds)

Repository: kudu
Updated Branches:
  refs/heads/master ffb8fd967 -> 1f639da62


Change timestamp printing to ISO-8601 (with microseconds)

Dan raised the concern in a recent review that we're not adhering to the
standard while printing timestamps. The main point of this was to mimic
impala's behavior but then we would also print 'GMT', which impala does
not, meaning we aren't really mimicking impala either.

This changes timestamp printing to adhere to ISO-8601, hopefully allowing
standard date parsers to more easily process the output, if needed.

Change-Id: I421e3595af2b21eee6ec22606b6046e470559ad4
Reviewed-on: http://gerrit.cloudera.org:8080/1995
Tested-by: David Ribeiro Alves <dr...@apache.org>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/09bf0340
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/09bf0340
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/09bf0340

Branch: refs/heads/master
Commit: 09bf03401e2ff6b1b2c041a2e94268ec5351d7b0
Parents: ffb8fd9
Author: David Alves <da...@cloudera.com>
Authored: Tue Feb 2 12:31:54 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Fri Sep 16 23:22:21 2016 +0000

----------------------------------------------------------------------
 src/kudu/common/types-test.cc | 10 +++++-----
 src/kudu/common/types.h       |  4 ++--
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/09bf0340/src/kudu/common/types-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/types-test.cc b/src/kudu/common/types-test.cc
index 51eedf0..ff93301 100644
--- a/src/kudu/common/types-test.cc
+++ b/src/kudu/common/types-test.cc
@@ -41,13 +41,13 @@ TEST(TestTypes, TestTimestampPrinting) {
   info->CopyMinValue(&time);
   string result;
   info->AppendDebugStringForValue(&time, &result);
-  ASSERT_EQ("-290308-12-21 19:59:05.224192 GMT", result);
+  ASSERT_EQ("-290308-12-21T19:59:05.224192Z", result);
   result = "";
 
   // Test a regular negative timestamp.
   time = -1454368523123456;
   info->AppendDebugStringForValue(&time, &result);
-  ASSERT_EQ("1923-12-01 00:44:36.876544 GMT", result);
+  ASSERT_EQ("1923-12-01T00:44:36.876544Z", result);
   result = "";
 
   // Test that passing 0 microseconds returns the correct time (0 msecs after the epoch).
@@ -55,19 +55,19 @@ TEST(TestTypes, TestTimestampPrinting) {
   // current time instead.
   time = 0;
   info->AppendDebugStringForValue(&time, &result);
-  ASSERT_EQ("1970-01-01 00:00:00.000000 GMT", result);
+  ASSERT_EQ("1970-01-01T00:00:00.000000Z", result);
   result = "";
 
   // Test a regular positive timestamp.
   time = 1454368523123456;
   info->AppendDebugStringForValue(&time, &result);
-  ASSERT_EQ("2016-02-01 23:15:23.123456 GMT", result);
+  ASSERT_EQ("2016-02-01T23:15:23.123456Z", result);
   result = "";
 
   // Test the maximum value.
   time = MathLimits<int64>::kMax;
   info->AppendDebugStringForValue(&time, &result);
-  ASSERT_EQ("294247-01-10 04:00:54.775807 GMT", result);
+  ASSERT_EQ("294247-01-10T04:00:54.775807Z", result);
 }
 
 namespace {

http://git-wip-us.apache.org/repos/asf/kudu/blob/09bf0340/src/kudu/common/types.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/types.h b/src/kudu/common/types.h
index d768b79..9bed955 100644
--- a/src/kudu/common/types.h
+++ b/src/kudu/common/types.h
@@ -416,8 +416,8 @@ struct DataTypeTraits<STRING> : public DerivedTypeTraits<BINARY>{
   }
 };
 
-static const char* kDateFormat = "%Y-%m-%d %H:%M:%S";
-static const char* kDateMicrosAndTzFormat = "%s.%06d GMT";
+static const char* kDateFormat = "%Y-%m-%dT%H:%M:%S";
+static const char* kDateMicrosAndTzFormat = "%s.%06dZ";
 
 template<>
 struct DataTypeTraits<UNIXTIME_MICROS> : public DerivedTypeTraits<INT64>{


[2/2] kudu git commit: [c++client] performance optimizations

Posted by to...@apache.org.
[c++client] performance optimizations

The change on Batcher::ComputeDeadlineUnlocked() gave
about 50% boost for scenarios when session timeout is not set
and write operations are small (raw/wire size ~100 bytes).

Avoid calling std::shared_from_this(KuduSession) for every scheduled
write operation.

Change-Id: I4b57fc7355f9f673f30861ec30cb6b48cdf656d2
Reviewed-on: http://gerrit.cloudera.org:8080/4385
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1f639da6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1f639da6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1f639da6

Branch: refs/heads/master
Commit: 1f639da62d1c995ad9a7b596160b42054757f5e1
Parents: 09bf034
Author: Alexey Serbin <as...@cloudera.com>
Authored: Wed Sep 14 23:52:01 2016 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Mon Sep 19 04:22:57 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc          | 14 +++------
 src/kudu/client/batcher.h           |  4 +--
 src/kudu/client/client.cc           |  4 +--
 src/kudu/client/session-internal.cc | 51 +++++++++++++-------------------
 src/kudu/client/session-internal.h  | 23 ++++++++++----
 5 files changed, 45 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index de6cfdc..d7c4dca 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -389,6 +389,7 @@ Batcher::Batcher(KuduClient* client,
     had_errors_(false),
     flush_callback_(nullptr),
     next_op_sequence_number_(0),
+    timeout_(MonoDelta::FromSeconds(60)),
     outstanding_lookups_(0),
     buffer_bytes_used_(0) {
 }
@@ -427,10 +428,9 @@ Batcher::~Batcher() {
   CHECK(state_ == kFlushed || state_ == kAborted) << "Bad state: " << state_;
 }
 
-void Batcher::SetTimeoutMillis(int millis) {
-  CHECK_GE(millis, 0);
+void Batcher::SetTimeout(const MonoDelta& timeout) {
   std::lock_guard<simple_spinlock> l(lock_);
-  timeout_ = MonoDelta::FromMilliseconds(millis);
+  timeout_ = timeout;
 }
 
 
@@ -478,13 +478,7 @@ void Batcher::CheckForFinishedFlush() {
 }
 
 MonoTime Batcher::ComputeDeadlineUnlocked() const {
-  MonoDelta timeout = timeout_;
-  if (PREDICT_FALSE(!timeout.Initialized())) {
-    KLOG_EVERY_N(WARNING, 1000) << "Client writing with no timeout set, using 60 seconds.\n"
-                                << GetStackTrace();
-    timeout = MonoDelta::FromSeconds(60);
-  }
-  return MonoTime::Now() + timeout;
+  return MonoTime::Now() + timeout_;
 }
 
 void Batcher::FlushAsync(KuduStatusCallback* cb) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/batcher.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h
index 566d356..3a52016 100644
--- a/src/kudu/client/batcher.h
+++ b/src/kudu/client/batcher.h
@@ -80,7 +80,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   // The timeout is currently set on all of the RPCs, but in the future will be relative
   // to when the Flush call is made (eg even if the lookup of the TS takes a long time, it
   // may time out before even sending an op). TODO: implement that
-  void SetTimeoutMillis(int millis);
+  void SetTimeout(const MonoDelta& timeout);
 
   // Add a new operation to the batch. Requires that the batch has not yet been flushed.
   //
@@ -215,7 +215,7 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
 
   // Amount of time to wait for a given op, from start to finish.
   //
-  // Set by SetTimeoutMillis.
+  // Set by SetTimeout().
   MonoDelta timeout_;
 
   // After flushing, the absolute deadline for all in-flight ops.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 4609f2f..8314019 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -775,7 +775,7 @@ Status KuduSession::SetFlushMode(FlushMode m) {
     // Be paranoid in client code.
     return Status::InvalidArgument("Bad flush mode");
   }
-  return data_->SetFlushMode(m, shared_from_this());
+  return data_->SetFlushMode(m);
 }
 
 Status KuduSession::SetExternalConsistencyMode(ExternalConsistencyMode m) {
@@ -820,7 +820,7 @@ bool KuduSession::HasPendingOperations() const {
 }
 
 Status KuduSession::Apply(KuduWriteOperation* write_op) {
-  RETURN_NOT_OK(data_->ApplyWriteOp(shared_from_this(), write_op));
+  RETURN_NOT_OK(data_->ApplyWriteOp(write_op));
   // Thread-safety note: this method should not be called concurrently
   // with other methods which modify the KuduSession::Data members, so it
   // should be safe to read KuduSession::Data members without protection.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/session-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.cc b/src/kudu/client/session-internal.cc
index 6fed4e0..b97fb3d 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -45,7 +45,6 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client,
       messenger_(std::move(messenger)),
       error_collector_(new ErrorCollector()),
       external_consistency_mode_(CLIENT_PROPAGATED),
-      timeout_ms_(-1),
       flush_interval_(MonoDelta::FromMilliseconds(1000)),
       flush_task_active_(false),
       flush_mode_(AUTO_FLUSH_SYNC),
@@ -59,7 +58,8 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client,
 }
 
 void KuduSession::Data::Init(weak_ptr<KuduSession> session) {
-  TimeBasedFlushInit(std::move(session));
+  session_.swap(session);
+  TimeBasedFlushInit();
 }
 
 void KuduSession::Data::FlushFinished(Batcher* batcher) {
@@ -100,7 +100,7 @@ Status KuduSession::Data::SetExternalConsistencyMode(
         "Cannot change external consistency mode when writes are buffered");
   }
   // Thread-safety note: the external_consistency_mode_ is not supposed
-  // to be accessed or modified from any other thread of control:
+  // to be accessed or modified from any other thread:
   // no thread-safety is assumed for the kudu::KuduSession interface.
   // However, the lock is needed to check for pending operations because
   // there may be pending RPCs and the background flush task may be running.
@@ -108,8 +108,7 @@ Status KuduSession::Data::SetExternalConsistencyMode(
   return Status::OK();
 }
 
-Status KuduSession::Data::SetFlushMode(FlushMode mode,
-                                       sp::weak_ptr<KuduSession> session) {
+Status KuduSession::Data::SetFlushMode(FlushMode mode) {
   {
     std::lock_guard<Mutex> l(mutex_);
     if (HasPendingOperationsUnlocked()) {
@@ -129,7 +128,7 @@ Status KuduSession::Data::SetFlushMode(FlushMode mode,
     flush_mode_ = mode;
   }
 
-  TimeBasedFlushInit(std::move(session));
+  TimeBasedFlushInit();
 
   return Status::OK();
 }
@@ -142,7 +141,7 @@ Status KuduSession::Data::SetBufferBytesLimit(size_t size) {
         "Cannot change buffer size limit when writes are buffered.");
   }
   // Thread-safety note: the buffer_bytes_limit_ is not supposed to be accessed
-  // or modified from any other thread of control: no thread-safety is assumed
+  // or modified from any other thread: no thread-safety is assumed
   // for the kudu::KuduSession interface. Due to the latter reason,
   // there should not be any threads waiting on conditions which are affected
   // by the change, so signalling other threads isn't necessary here.
@@ -165,7 +164,7 @@ Status KuduSession::Data::SetBufferFlushWatermark(int watermark_pct) {
         "Cannot change buffer flush watermark when writes are buffered.");
   }
   // Thread-safety note: the buffer_watermark_pct_ is not supposed
-  // to be accessed or modified from any other thread of control:
+  // to be accessed or modified from any other thread:
   // no thread-safety is assumed for the kudu::KuduSession interface.
   // Due to the latter reason, there should not be any threads waiting on
   // conditions which are affected by the setting, so no signalling
@@ -199,7 +198,7 @@ Status KuduSession::Data::SetMaxBatchersNum(unsigned int max_num) {
         "Cannot change the limit on maximum number of batchers when writes are buffered.");
   }
   // Thread-safety note: the batchers_num_limit_ is not supposed
-  // to be accessed or modified from any other thread of control:
+  // to be accessed or modified from any other thread:
   // no thread-safety is assumed for the kudu::KuduSession interface.
   // Due to the latter reason, there should not be any threads waiting
   // on conditions which are affected by the setting, so no signalling
@@ -216,9 +215,9 @@ void KuduSession::Data::SetTimeoutMillis(int timeout_ms) {
   }
   {
     std::lock_guard<Mutex> l(mutex_);
-    timeout_ms_ = timeout_ms;
+    timeout_ = MonoDelta::FromMilliseconds(timeout_ms);
     if (batcher_) {
-      batcher_->SetTimeoutMillis(timeout_ms);
+      batcher_->SetTimeout(timeout_);
     }
   }
 }
@@ -272,7 +271,7 @@ void KuduSession::Data::FlushCurrentBatcher(int64_t watermark,
   scoped_refptr<Batcher> batcher_to_flush;
   {
     std::lock_guard<Mutex> l(mutex_);
-    if (batcher_ && batcher_->buffer_bytes_used() >= watermark) {
+    if (PREDICT_TRUE(batcher_) && batcher_->buffer_bytes_used() >= watermark) {
       batcher_to_flush.swap(batcher_);
     }
   }
@@ -319,10 +318,7 @@ MonoDelta KuduSession::Data::FlushCurrentBatcher(const MonoDelta& max_age) {
 // from this this method, the operation must end up either in the corresponding
 // batcher (success path) or in the error collector (failure path). Otherwise
 // it would be a memory leak.
-Status KuduSession::Data::ApplyWriteOp(
-    sp::weak_ptr<KuduSession> weak_session,
-    KuduWriteOperation* write_op) {
-
+Status KuduSession::Data::ApplyWriteOp(KuduWriteOperation* write_op) {
   if (PREDICT_FALSE(!write_op)) {
     return Status::InvalidArgument("NULL operation");
   }
@@ -337,14 +333,7 @@ Status KuduSession::Data::ApplyWriteOp(
   // Get 'wire size' of the write operation.
   const int64_t required_size = Batcher::GetOperationSizeInBuffer(write_op);
 
-  // Thread-safety note: the buffer_bytes_limit_ and
-  // buffer_watermark_pct_ are not supposed to be modified
-  // from any other thread of control since no thread-safety is advertised
-  // for the kudu::KuduSession interface.
-  // So, no protection while accessing those members.
   const size_t max_size = buffer_bytes_limit_;
-  const size_t flush_watermark =
-      buffer_bytes_limit_ * buffer_watermark_pct_ / 100;
   // Thread-safety note: the flush_mode_ is accessed from the background
   // time-based flush task for reading. Practically, it would be possible
   // to get away with not protecting the flush_mode_ since it's read-only
@@ -426,13 +415,12 @@ Status KuduSession::Data::ApplyWriteOp(
       DCHECK(!batcher_);
       // Thread-safety note: the external_consistecy_mode_ and timeout_ms_
       // are not supposed to be accessed or modified from any other thread
-      // of control since no thread-safety is advertised
-      // for the kudu::KuduSession interface.
+      // no thread-safety is advertised for the kudu::KuduSession interface.
       scoped_refptr<Batcher> batcher(
-          new Batcher(client_.get(), error_collector_, std::move(weak_session),
+          new Batcher(client_.get(), error_collector_, session_,
                       external_consistency_mode_));
-      if (timeout_ms_ != -1) {
-        batcher->SetTimeoutMillis(timeout_ms_);
+      if (timeout_.Initialized()) {
+        batcher->SetTimeout(timeout_);
       }
       batcher.swap(batcher_);
       ++batchers_num_;
@@ -448,6 +436,8 @@ Status KuduSession::Data::ApplyWriteOp(
   }
 
   if (flush_mode == AUTO_FLUSH_BACKGROUND) {
+    const size_t flush_watermark =
+        buffer_bytes_limit_ * buffer_watermark_pct_ / 100;
     // In AUTO_FLUSH_BACKGROUND mode it's necessary to flush the newly added
     // operations if the flush watermark is reached. The current batcher is
     // the exclusive and the only container for the newly added operations.
@@ -459,10 +449,9 @@ Status KuduSession::Data::ApplyWriteOp(
   return Status::OK();
 }
 
-void KuduSession::Data::TimeBasedFlushInit(
-    sp::weak_ptr<KuduSession> weak_session) {
+void KuduSession::Data::TimeBasedFlushInit() {
   KuduSession::Data::TimeBasedFlushTask(
-      Status::OK(), messenger_, std::move(weak_session), true);
+      Status::OK(), messenger_, session_, true);
 }
 
 void KuduSession::Data::TimeBasedFlushTask(

http://git-wip-us.apache.org/repos/asf/kudu/blob/1f639da6/src/kudu/client/session-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.h b/src/kudu/client/session-internal.h
index fe2225c..66a70a6 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -60,7 +60,7 @@ class KuduSession::Data {
   void Init(sp::weak_ptr<KuduSession> session);
 
   // Called by Batcher when a flush has finished.
-  void FlushFinished(internal::Batcher* b);
+  void FlushFinished(internal::Batcher* batcher);
 
   // Returns Status::IllegalState() if 'force' is false and there are still pending
   // operations. If 'force' is true batcher_ is aborted even if there are pending
@@ -68,7 +68,7 @@ class KuduSession::Data {
   Status Close(bool force);
 
   // Set flush mode for the session.
-  Status SetFlushMode(FlushMode mode, sp::weak_ptr<KuduSession> session);
+  Status SetFlushMode(FlushMode mode);
 
   // Set external consistency mode for the session.
   Status SetExternalConsistencyMode(KuduSession::ExternalConsistencyMode m);
@@ -123,11 +123,10 @@ class KuduSession::Data {
   MonoDelta FlushCurrentBatcher(const MonoDelta& max_age);
 
   // Apply a write operation, i.e. push it through the batcher chain.
-  Status ApplyWriteOp(sp::weak_ptr<KuduSession> session,
-                      KuduWriteOperation* write_op);
+  Status ApplyWriteOp(KuduWriteOperation* write_op);
 
   // Check and start the time-based flush task in background, if necessary.
-  void TimeBasedFlushInit(sp::weak_ptr<KuduSession> weak_session);
+  void TimeBasedFlushInit();
 
   // The self-rescheduling task to flush write operations which have been
   // accumulating for too long (controlled by flush_interval_).
@@ -158,6 +157,12 @@ class KuduSession::Data {
   // The client that this session is associated with.
   const sp::shared_ptr<KuduClient> client_;
 
+  // Weak reference to the containing session. The reference is weak to
+  // avoid circular referencing.  The reference to the KuduSession object
+  // is needed by batchers and time-based flush task: being run in independent
+  // threads, they need to make sure the object is alive before accessing it.
+  sp::weak_ptr<KuduSession> session_;
+
   // The reference to the client's messenger (keeping the reference instead of
   // declaring friendship to KuduClient and accessing it via the client_).
   std::weak_ptr<rpc::Messenger> messenger_;
@@ -168,7 +173,7 @@ class KuduSession::Data {
   kudu::client::KuduSession::ExternalConsistencyMode external_consistency_mode_;
 
   // Timeout for the next batch.
-  int timeout_ms_;
+  MonoDelta timeout_;
 
   // Interval for the max-wait flush background task.
   MonoDelta flush_interval_;  // protected by mutex_
@@ -207,12 +212,18 @@ class KuduSession::Data {
   // operations. The buffer is a virtual entity: there isn't contiguous place
   // in the memory which would contain that 'buffered' data. Instead, buffer's
   // data is spread across all pending operations in all active batchers.
+  // Thread-safety note: buffer_bytes_limit_ is not supposed to be modified
+  // from any other thread since no thread-safety is advertised for the
+  // kudu::KuduSession interface.
   size_t buffer_bytes_limit_;
 
   // The high-watermark level as the percentage of the buffer space used by
   // freshly added (not-yet-scheduled-for-flush) write operations.
   // Once the level is reached, the BackgroundFlusher triggers flushing
   // of accumulated write operations when running in AUTO_FLUSH_BACKGROUND mode.
+  // Thread-safety note: buffer_watermark_pct_ is not supposed to be modified
+  // from any other thread since no thread-safety is advertised for the
+  // kudu::KuduSession interface.
   int32_t buffer_watermark_pct_;
 
   // The total number of bytes used by buffered write operations.