You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:53 UTC

[hbase] 65/133: HBASE-17629 [C++] Timeouts and retry configuration for connections

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 61d2c21df5e6f463772d5286cc5893f0c52da167
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Fri Feb 24 16:30:52 2017 -0800

    HBASE-17629 [C++] Timeouts and retry configuration for connections
---
 hbase-native-client/bin/cpplint.sh                 |   3 +-
 .../connection/connection-factory.cc               |  19 +-
 .../connection/connection-factory.h                |  10 +-
 hbase-native-client/connection/connection-pool.cc  |   5 +-
 hbase-native-client/connection/connection-pool.h   |  19 +-
 hbase-native-client/connection/rpc-client.cc       |   4 +-
 hbase-native-client/connection/rpc-client.h        |   5 +-
 hbase-native-client/core/BUCK                      |   1 +
 hbase-native-client/core/client.cc                 |  12 +-
 hbase-native-client/core/client.h                  |  24 ++-
 .../core/connection-configuration.h                | 195 +++++++++++++++++++++
 11 files changed, 268 insertions(+), 29 deletions(-)

diff --git a/hbase-native-client/bin/cpplint.sh b/hbase-native-client/bin/cpplint.sh
index 78a00bd..81795fd 100755
--- a/hbase-native-client/bin/cpplint.sh
+++ b/hbase-native-client/bin/cpplint.sh
@@ -26,4 +26,5 @@ wget -nc $CPPLINT_LOC -O $OUTPUT
 # Execute the script
 # Exclude the following rules: build/header_guard (We use #pragma once instead)
 #                              readability/todo (TODOs are generic)
-find core connection serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 python $OUTPUT --filter=-build/header_guard,-readability/todo --linelength=100
+#                              build/c++11 (We are building with c++14)
+find core connection serde utils test-util security -name "*.h" -or -name "*.cc" | xargs -P8 python $OUTPUT --filter=-build/header_guard,-readability/todo,-build/c++11 --linelength=100
diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc
index 6aba351..2f7e75c 100644
--- a/hbase-native-client/connection/connection-factory.cc
+++ b/hbase-native-client/connection/connection-factory.cc
@@ -19,22 +19,32 @@
 
 #include "connection/connection-factory.h"
 
+#include <chrono>
+
 #include "connection/client-dispatcher.h"
 #include "connection/pipeline.h"
 #include "connection/service.h"
 
 using namespace folly;
 using namespace hbase;
+using std::chrono::milliseconds;
+using std::chrono::nanoseconds;
 
 ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
-                                     std::shared_ptr<Codec> codec)
-    : io_pool_(io_pool), pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
+                                     std::shared_ptr<Codec> codec,
+									 nanoseconds connect_timeout)
+    : connect_timeout_(connect_timeout),
+	  io_pool_(io_pool),
+	  pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {}
 
 std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::MakeBootstrap() {
   auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>();
   client->group(io_pool_);
   client->pipelineFactory(pipeline_factory_);
 
+  // TODO: Opened https://github.com/facebook/wangle/issues/85 in wangle so that we can set socket
+  //  options like TCP_NODELAY, SO_KEEPALIVE, CONNECT_TIMEOUT_MILLIS, etc.
+
   return client;
 }
 std::shared_ptr<HBaseService> ConnectionFactory::Connect(
@@ -43,7 +53,10 @@ std::shared_ptr<HBaseService> ConnectionFactory::Connect(
   // Yes this will block however it makes dealing with connection pool soooooo
   // much nicer.
   // TODO see about using shared promise for this.
-  auto pipeline = client->connect(SocketAddress(hostname, port, true)).get();
+  auto pipeline = client
+                      ->connect(SocketAddress(hostname, port, true),
+                                std::chrono::duration_cast<milliseconds>(connect_timeout_))
+                      .get();
   auto dispatcher = std::make_shared<ClientDispatcher>();
   dispatcher->setPipeline(pipeline);
   return dispatcher;
diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h
index 0d1e0d0..fbcb6ef 100644
--- a/hbase-native-client/connection/connection-factory.h
+++ b/hbase-native-client/connection/connection-factory.h
@@ -20,6 +20,7 @@
 
 #include <wangle/service/Service.h>
 
+#include <chrono>
 #include <memory>
 #include <string>
 
@@ -28,6 +29,8 @@
 #include "connection/response.h"
 #include "connection/service.h"
 
+using std::chrono::nanoseconds;
+
 namespace hbase {
 
 /**
@@ -41,8 +44,10 @@ class ConnectionFactory {
    * There should only be one ConnectionFactory per client.
    */
   ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool,
-                    std::shared_ptr<Codec> codec);
-  /** Default Desctructor */
+                    std::shared_ptr<Codec> codec,
+					nanoseconds connect_timeout = nanoseconds(0));
+
+  /** Default Destructor */
   virtual ~ConnectionFactory() = default;
 
   /**
@@ -60,6 +65,7 @@ class ConnectionFactory {
       const std::string &hostname, int port);
 
  private:
+  nanoseconds connect_timeout_;
   std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool_;
   std::shared_ptr<RpcPipelineFactory> pipeline_factory_;
 };
diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index 6635a6d..b18ee89 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -34,8 +34,9 @@ using folly::SharedMutexWritePriority;
 using folly::SocketAddress;
 
 ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
-                               std::shared_ptr<Codec> codec)
-    : cf_(std::make_shared<ConnectionFactory>(io_executor, codec)),
+                               std::shared_ptr<Codec> codec,
+							   nanoseconds connect_timeout)
+    : cf_(std::make_shared<ConnectionFactory>(io_executor, codec, connect_timeout)),
       clients_(),
       connections_(),
       map_mutex_() {}
diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h
index 23e5e9a..2a8f195 100644
--- a/hbase-native-client/connection/connection-pool.h
+++ b/hbase-native-client/connection/connection-pool.h
@@ -20,6 +20,7 @@
 
 #include <folly/SharedMutex.h>
 #include <boost/functional/hash.hpp>
+#include <chrono>
 #include <memory>
 #include <mutex>
 #include <unordered_map>
@@ -35,6 +36,8 @@ using hbase::ConnectionIdEquals;
 using hbase::ConnectionIdHash;
 using hbase::RpcConnection;
 
+using std::chrono::nanoseconds;
+
 namespace hbase {
 
 /**
@@ -47,14 +50,7 @@ class ConnectionPool {
  public:
   /** Create connection pool wit default connection factory */
   ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
-                 std::shared_ptr<Codec> codec);
-
-  /**
-   * Desctructor.
-   * All connections will be close.
-   * All connections will be released
-   */
-  ~ConnectionPool();
+                 std::shared_ptr<Codec> codec, nanoseconds connect_timeout = nanoseconds(0));
 
   /**
    * Constructor that allows specifiying the connetion factory.
@@ -63,6 +59,13 @@ class ConnectionPool {
   explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf);
 
   /**
+   * Destructor.
+   * All connections will be close.
+   * All connections will be released
+   */
+  ~ConnectionPool();
+
+  /**
    * Get a connection to the server name. Start time is ignored.
    * This can be a blocking operation for a short time.
    */
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index cfbda3a..c61a73e 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -40,9 +40,9 @@ class RpcChannelImplementation : public AbstractRpcChannel {
 }  // namespace hbase
 
 RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
-                     std::shared_ptr<Codec> codec)
+                     std::shared_ptr<Codec> codec, nanoseconds connect_timeout)
     : io_executor_(io_executor) {
-  cp_ = std::make_shared<ConnectionPool>(io_executor_, codec);
+  cp_ = std::make_shared<ConnectionPool>(io_executor_, codec, connect_timeout);
 }
 
 void RpcClient::Close() { io_executor_->stop(); }
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index f4645a0..5c11ab5 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -26,6 +26,7 @@
 
 #include <google/protobuf/service.h>
 
+#include <chrono>
 #include <utility>
 
 using hbase::security::User;
@@ -43,6 +44,8 @@ using google::protobuf::Message;
 using google::protobuf::RpcController;
 using google::protobuf::Closure;
 
+using std::chrono::nanoseconds;
+
 class RpcChannelImplementation;
 
 namespace hbase {
@@ -52,7 +55,7 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
 
  public:
   RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
-            std::shared_ptr<Codec> codec);
+            std::shared_ptr<Codec> codec, nanoseconds connect_timeout);
 
   virtual ~RpcClient() { Close(); }
 
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index ce0c733..e541d8f 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -27,6 +27,7 @@ cxx_library(
         "keyvalue-codec.h",
         "region-location.h",
         "location-cache.h",
+        "connection-configuration.h",
         # TODO: move this out of exported
         # Once meta lookup works
         "meta-utils.h",
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 685524f..240da72 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -20,6 +20,7 @@
 #include "core/client.h"
 
 #include <glog/logging.h>
+#include <chrono>
 #include <exception>
 #include <utility>
 
@@ -42,9 +43,12 @@ void Client::init(const hbase::Configuration &conf) {
   conf_ = std::make_shared<hbase::Configuration>(conf);
   auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
 
-  cpu_executor_ =
-      std::make_shared<wangle::CPUThreadPoolExecutor>(4);  // TODO: read num threads from conf
-  io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
+  conn_conf_ = std::make_shared<hbase::ConnectionConfiguration>(*conf_);
+  // start thread pools
+  auto io_threads = conf_->GetInt(kClientIoThreadPoolSize, sysconf(_SC_NPROCESSORS_ONLN));
+  auto cpu_threads = conf_->GetInt(kClientCpuThreadPoolSize, 2 * sysconf(_SC_NPROCESSORS_ONLN));
+  cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(cpu_threads);
+  io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(io_threads);
 
   std::shared_ptr<Codec> codec = nullptr;
   if (conf.Get(kRpcCodec, hbase::KeyValueCodec::kJavaClassName) ==
@@ -53,7 +57,7 @@ void Client::init(const hbase::Configuration &conf) {
   } else {
     LOG(WARNING) << "Not using RPC Cell Codec";
   }
-  rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec);
+  rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec, conn_conf_->connect_timeout());
   location_cache_ =
       std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool());
 }
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 0e436ba..a96d6f3 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -29,6 +29,7 @@
 
 #include "connection/rpc-client.h"
 #include "core/configuration.h"
+#include "core/connection-configuration.h"
 #include "core/hbase_configuration_loader.h"
 #include "core/keyvalue-codec.h"
 #include "core/location-cache.h"
@@ -54,13 +55,13 @@ class Client {
    * @param quorum_spec Where to connect to get Zookeeper bootstrap information.
    */
   Client();
-  explicit Client(const hbase::Configuration &conf);
+  explicit Client(const hbase::Configuration& conf);
   ~Client();
   /**
    * @brief Retrieve a Table implementation for accessing a table.
    * @param - table_name
    */
-  std::unique_ptr<hbase::Table> Table(const TableName &table_name);
+  std::unique_ptr<hbase::Table> Table(const TableName& table_name);
 
   /**
    * @brief Close the Client connection.
@@ -68,16 +69,27 @@ class Client {
   void Close();
 
  private:
-  void init(const hbase::Configuration &conf);
-  const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
-  const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181";
-  const std::string kRpcCodec = "hbase.client.rpc.codec";
+  /** Constants */
+  static constexpr const char* kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
+  static constexpr const char* kDefHBaseZookeeperQuorum_ = "localhost:2181";
+  /** Parameter name for HBase client IO thread pool size. Defaults to num cpus */
+  static constexpr const char* kClientIoThreadPoolSize = "hbase.client.io.thread.pool.size";
+  /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
+  static constexpr const char* kClientCpuThreadPoolSize = "hbase.client.cpu.thread.pool.size";
+  /** The RPC codec to encode cells. For now it is KeyValueCodec */
+  static constexpr const char* kRpcCodec = "hbase.client.rpc.codec";
+
+  /** Data */
   std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
   std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
   std::shared_ptr<hbase::LocationCache> location_cache_;
   std::shared_ptr<hbase::RpcClient> rpc_client_;
   std::shared_ptr<hbase::Configuration> conf_;
+  std::shared_ptr<hbase::ConnectionConfiguration> conn_conf_;
   bool is_closed_ = false;
+
+  /** Methods */
+  void init(const hbase::Configuration &conf);
 };
 
 }  // namespace hbase
diff --git a/hbase-native-client/core/connection-configuration.h b/hbase-native-client/core/connection-configuration.h
new file mode 100644
index 0000000..e1e9f87
--- /dev/null
+++ b/hbase-native-client/core/connection-configuration.h
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <chrono>
+#include <climits>
+#include <string>
+
+#include "core/configuration.h"
+
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+/**
+ * Timeout configs.
+ */
+class ConnectionConfiguration {
+ public:
+  explicit ConnectionConfiguration(const Configuration& conf) {
+    connect_timeout_ =
+        ToNanos(conf.GetInt(kClientSocketConnectTimeout, kDefaultClientSocketConnectTimeout));
+    meta_operation_timeout_ =
+        ToNanos(conf.GetLong(kClientMetaOperationTimeout, kDefaultClientOperationTimeout));
+    operation_timeout_ =
+        ToNanos(conf.GetLong(kClientOperationTimeout, kDefaultClientOperationTimeout));
+    rpc_timeout_ = ToNanos(conf.GetLong(kRpcTimeout, kDefaultRpcTimeout));
+    read_rpc_timeout_ = ToNanos(conf.GetLong(kRpcReadTimeout, ToMillis(rpc_timeout_)));
+    write_rpc_timeout_ = ToNanos(conf.GetLong(kRpcWriteTimeout, ToMillis(rpc_timeout_)));
+    pause_ = ToNanos(conf.GetLong(kClientPause, kDefaultClientPause));
+    max_retries_ = conf.GetInt(kClientRetriesNumber, kDefaultClientRetriesNumber);
+    start_log_errors_count_ =
+        conf.GetInt(kStartLogErrorsAfterCount, kDefaultStartLogErrorsAfterCount);
+    scan_timeout_ =
+        ToNanos(conf.GetLong(kClientScannerTimeoutPeriod, kDefaultClientScannerTimeoutPeriod));
+    scanner_caching_ = conf.GetInt(kClientScannerCaching, kDefaultClientScannerCaching);
+    scanner_max_result_size_ =
+        conf.GetLong(kClientScannerMaxResultsSize, kDefaultClientScannerMaxResultsSize);
+  }
+
+  nanoseconds connect_timeout() const { return connect_timeout_; }
+
+  nanoseconds meta_operation_timeout() const { return meta_operation_timeout_; }
+
+  // timeout for a whole operation such as get, put or delete. Notice that scan will not be effected
+  // by this value, see scanTimeoutNs.
+  nanoseconds operation_timeout() const { return operation_timeout_; }
+
+  // timeout for each rpc request. Can be overridden by a more specific config, such as
+  // readRpcTimeout or writeRpcTimeout.
+  nanoseconds rpc_timeout() const { return rpc_timeout_; }
+
+  // timeout for each read rpc request
+  nanoseconds read_rpc_timeout() const { return read_rpc_timeout_; }
+
+  // timeout for each write rpc request
+  nanoseconds write_rpc_timeout() const { return write_rpc_timeout_; }
+
+  nanoseconds pause_nanos() const { return pause_; }
+
+  uint32_t max_retries() const { return max_retries_; }
+
+  /** How many retries are allowed before we start to log */
+  uint32_t start_log_errors_count() const { return start_log_errors_count_; }
+
+  // The scan timeout is used as operation timeout for every
+  // operations in a scan, such as openScanner or next.
+  nanoseconds scan_timeout() const { return scan_timeout_; }
+
+  uint32_t scanner_caching() const { return scanner_caching_; }
+
+  uint64_t scanner_max_result_size() const { return scanner_max_result_size_; }
+
+ private:
+  /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
+  static constexpr const char* kClientSocketConnectTimeout =
+      "hbase.ipc.client.socket.timeout.connect";
+  /** Parameter name for HBase client CPU thread pool size. Defaults to (2 * num cpus) */
+  static constexpr const uint32_t kDefaultClientSocketConnectTimeout = 10000;  // 10 secs
+
+  /** Parameter name for HBase client operation timeout. */
+  static constexpr const char* kClientOperationTimeout = "hbase.client.operation.timeout";
+
+  /** Parameter name for HBase client meta operation timeout. */
+  static constexpr const char* kClientMetaOperationTimeout = "hbase.client.meta.operation.timeout";
+
+  /** Default HBase client operation timeout, which is tantamount to a blocking call */
+  static constexpr const uint32_t kDefaultClientOperationTimeout = 1200000;
+
+  /** timeout for each RPC */
+  static constexpr const char* kRpcTimeout = "hbase.rpc.timeout";
+
+  /** timeout for each read RPC */
+  static constexpr const char* kRpcReadTimeout = "hbase.rpc.read.timeout";
+
+  /** timeout for each write RPC */
+  static constexpr const char* kRpcWriteTimeout = "hbase.rpc.write.timeout";
+
+  static constexpr const uint32_t kDefaultRpcTimeout = 60000;
+
+  /**
+    * Parameter name for client pause value, used mostly as value to wait
+    * before running a retry of a failed get, region lookup, etc.
+    */
+  static constexpr const char* kClientPause = "hbase.client.pause";
+
+  static constexpr const uint64_t kDefaultClientPause = 100;
+
+  /**
+   * Parameter name for maximum retries, used as maximum for all retryable
+   * operations such as fetching of the root region from root region server,
+   * getting a cell's value, starting a row update, etc.
+   */
+  static constexpr const char* kClientRetriesNumber = "hbase.client.retries.number";
+
+  static constexpr const uint32_t kDefaultClientRetriesNumber = 31;
+
+  /**
+    * Configure the number of failures after which the client will start logging. A few failures
+    * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
+    * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
+    * this stage.
+    */
+  static constexpr const char* kStartLogErrorsAfterCount = "hbase.client.start.log.errors.counter";
+  static constexpr const uint32_t kDefaultStartLogErrorsAfterCount = 9;
+
+  /** The client scanner timeout period in milliseconds. */
+  static constexpr const char* kClientScannerTimeoutPeriod = "hbase.client.scanner.timeout.period";
+
+  static constexpr const uint32_t kDefaultClientScannerTimeoutPeriod = 60000;
+
+  /**
+   * Parameter name to set the default scanner caching for all clients.
+   */
+  static constexpr const char* kClientScannerCaching = "hbase.client.scanner.caching";
+
+  static constexpr const uint32_t kDefaultClientScannerCaching = INT_MAX;
+
+  /**
+   * Parameter name for maximum number of bytes returned when calling a scanner's next method.
+   * Controlled by the client.
+   */
+  static constexpr const char* kClientScannerMaxResultsSize =
+      "hbase.client.scanner.max.result.size";
+
+  /**
+   * Maximum number of bytes returned when calling a scanner's next method.
+   * Note that when a single row is larger than this limit the row is still
+   * returned completely.
+   *
+   * The default value is 2MB.
+   */
+  static constexpr const uint64_t kDefaultClientScannerMaxResultsSize = 2 * 1024 * 1024;
+
+  nanoseconds connect_timeout_;
+  nanoseconds meta_operation_timeout_;
+  nanoseconds operation_timeout_;
+  nanoseconds rpc_timeout_;
+  nanoseconds read_rpc_timeout_;
+  nanoseconds write_rpc_timeout_;
+  nanoseconds pause_;
+  uint32_t max_retries_;
+  uint32_t start_log_errors_count_;
+  nanoseconds scan_timeout_;
+  uint32_t scanner_caching_;
+  uint64_t scanner_max_result_size_;
+
+  static nanoseconds ToNanos(const uint64_t& millis) {
+    return std::chrono::duration_cast<nanoseconds>(milliseconds(millis));
+  }
+
+  static uint64_t ToMillis(const nanoseconds& nanos) {
+    return std::chrono::duration_cast<milliseconds>(nanos).count();
+  }
+};
+
+}  // namespace hbase