You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/03/12 12:45:43 UTC
[hbase] 55/133: HBASE-17315 [C++] HBase Client and Table
Implementation (Sudeep Sunthankar)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit fd0109e2291f1a27bd8198bf1c2dfa4f0b9857e7
Author: Enis Soztutar <en...@apache.org>
AuthorDate: Fri Jan 13 16:03:30 2017 -0800
HBASE-17315 [C++] HBase Client and Table Implementation (Sudeep Sunthankar)
---
hbase-native-client/connection/BUCK | 2 +
hbase-native-client/core/BUCK | 12 ++
hbase-native-client/core/client-test.cc | 240 ++++++++++++++++++++++++++++++++
hbase-native-client/core/client.cc | 46 ++++--
hbase-native-client/core/client.h | 37 ++++-
hbase-native-client/core/table.cc | 74 ++++++++++
hbase-native-client/core/table.h | 69 +++++++++
7 files changed, 462 insertions(+), 18 deletions(-)
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index c22cc89..19536d5 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -30,6 +30,7 @@ cxx_library(
"rpc-connection.h",
"response.h",
"service.h",
+ "rpc-client.h",
],
srcs=[
"client-dispatcher.cc",
@@ -38,6 +39,7 @@ cxx_library(
"connection-pool.cc",
"pipeline.cc",
"request.cc",
+ "rpc-client.cc",
],
deps=[
"//if:if",
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index b7db41a..0d1bc93 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -35,6 +35,7 @@ cxx_library(
"result.h",
"request_converter.h",
"response_converter.h",
+ "table.h",
],
srcs=[
"cell.cc",
@@ -49,6 +50,7 @@ cxx_library(
"result.cc",
"request_converter.cc",
"response_converter.cc",
+ "table.cc",
],
deps=[
"//connection:connection",
@@ -107,6 +109,16 @@ cxx_test(
"//if:if",
],
run_test_separately=True,)
+cxx_test(
+ name="client-test",
+ srcs=["client-test.cc",],
+ deps=[
+ ":core",
+ "//if:if",
+ "//serde:serde",
+ "//test-util:test-util",
+ ],
+ run_test_separately=True,)
cxx_binary(
name="simple-client",
srcs=["simple-client.cc",],
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
new file mode 100644
index 0000000..0fe0225
--- /dev/null
+++ b/hbase-native-client/core/client-test.cc
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/get.h"
+#include "core/hbase_configuration_loader.h"
+#include "core/result.h"
+#include "core/table.h"
+#include "serde/table-name.h"
+#include "test-util/test-util.h"
+
+class ClientTest {
+ public:
+ const static std::string kDefHBaseConfPath;
+
+ const static std::string kHBaseDefaultXml;
+ const static std::string kHBaseSiteXml;
+
+ const static std::string kHBaseXmlData;
+
+ static void WriteDataToFile(const std::string &file, const std::string &xml_data) {
+ std::ofstream hbase_conf;
+ hbase_conf.open(file.c_str());
+ hbase_conf << xml_data;
+ hbase_conf.close();
+ }
+
+ static void CreateHBaseConf(const std::string &dir, const std::string &file,
+ const std::string xml_data) {
+ // Directory will be created if not present
+ if (!boost::filesystem::exists(dir)) {
+ boost::filesystem::create_directories(dir);
+ }
+ // Remove temp file always
+ boost::filesystem::remove((dir + file).c_str());
+ WriteDataToFile((dir + file), xml_data);
+ }
+
+ static void CreateHBaseConfWithEnv() {
+ // Creating Empty Config Files so that we dont get a Configuration exception @Client
+ CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseXmlData);
+ CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseXmlData);
+ setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 1);
+ }
+};
+
+const std::string ClientTest::kDefHBaseConfPath("./build/test-data/client-test/conf/");
+
+const std::string ClientTest::kHBaseDefaultXml("hbase-default.xml");
+const std::string ClientTest::kHBaseSiteXml("hbase-site.xml");
+
+const std::string ClientTest::kHBaseXmlData(
+ "<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" "
+ "href=\"configuration.xsl\"?>\n<!--\n/**\n *\n * Licensed to the Apache "
+ "Software Foundation (ASF) under one\n * or more contributor license "
+ "agreements. See the NOTICE file\n * distributed with this work for "
+ "additional information\n * regarding copyright ownership. The ASF "
+ "licenses this file\n * to you under the Apache License, Version 2.0 "
+ "(the\n * \"License\"); you may not use this file except in compliance\n * "
+ "with the License. You may obtain a copy of the License at\n *\n * "
+ "http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by "
+ "applicable law or agreed to in writing, software\n * distributed under "
+ "the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES "
+ "OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License "
+ "for the specific language governing permissions and\n * limitations under "
+ "the License.\n "
+ "*/\n-->\n<configuration>\n\n</configuration>");
+
+TEST(Client, EmptyConfigurationPassedToClient) {
+ ASSERT_ANY_THROW(hbase::Client client);
+}
+
+TEST(Client, ConfigurationPassedToClient) {
+ // Remove already configured env if present.
+ unsetenv("HBASE_CONF");
+ ClientTest::CreateHBaseConfWithEnv();
+
+ // Create Configuration
+ hbase::HBaseConfigurationLoader loader;
+ auto conf = loader.LoadDefaultResources();
+ // Create a client
+ hbase::Client client(conf.value());
+ client.Close();
+}
+
+TEST(Client, DefaultConfiguration) {
+ // Remove already configured env if present.
+ unsetenv("HBASE_CONF");
+ ClientTest::CreateHBaseConfWithEnv();
+
+ // Create Configuration
+ hbase::Client client;
+ client.Close();
+}
+
+TEST(Client, Get) {
+ // Remove already configured env if present.
+ unsetenv("HBASE_CONF");
+ ClientTest::CreateHBaseConfWithEnv();
+
+ // Using TestUtil to populate test data
+ hbase::TestUtil *test_util = new hbase::TestUtil();
+ test_util->RunShellCmd("create 't', 'd'");
+ test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'");
+ test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'");
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>("t");
+ auto row = "test2";
+
+ // Get to be performed on above HBase Table
+ hbase::Get get(row);
+
+ // Create Configuration
+ hbase::HBaseConfigurationLoader loader;
+ auto conf = loader.LoadDefaultResources();
+
+ // Create a client
+ hbase::Client client(conf.value());
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ // Perform the Get
+ auto result = table->Get(get);
+
+ // Stopping the connection as we are getting segfault due to some folly issue
+ // The connection stays open and we don't want that.
+ // So we are stopping the connection.
+ // We can remove this once we have fixed the folly part
+ delete test_util;
+
+ // Test the values, should be same as in put executed on hbase shell
+ ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+ EXPECT_EQ("test2", result->Row());
+ EXPECT_EQ("value2", *(result->Value("d", "2")));
+ EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
+
+ table->Close();
+ client.Close();
+}
+
+TEST(Client, GetForNonExistentTable) {
+ // Remove already configured env if present.
+ unsetenv("HBASE_CONF");
+ ClientTest::CreateHBaseConfWithEnv();
+
+ // Using TestUtil to populate test data
+ hbase::TestUtil *test_util = new hbase::TestUtil();
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>("t_not_exists");
+ auto row = "test2";
+
+ // Get to be performed on above HBase Table
+ hbase::Get get(row);
+
+ // Create Configuration
+ hbase::HBaseConfigurationLoader loader;
+ auto conf = loader.LoadDefaultResources();
+
+ // Create a client
+ hbase::Client client(conf.value());
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ // Perform the Get
+ ASSERT_ANY_THROW(table->Get(get)) << "Table does not exist. We should get an exception";
+
+ // Stopping the connection as we are getting segfault due to some folly issue
+ // The connection stays open and we don't want that.
+ // So we are stopping the connection.
+ // We can remove this once we have fixed the folly part
+ delete test_util;
+
+ table->Close();
+ client.Close();
+}
+
+TEST(Client, GetForNonExistentRow) {
+ // Remove already configured env if present.
+ unsetenv("HBASE_CONF");
+ ClientTest::CreateHBaseConfWithEnv();
+
+ // Using TestUtil to populate test data
+ hbase::TestUtil *test_util = new hbase::TestUtil();
+ test_util->RunShellCmd("create 't_exists', 'd'");
+
+ // Create TableName and Row to be fetched from HBase
+ auto tn = folly::to<hbase::pb::TableName>("t_exists");
+ auto row = "row_not_exists";
+
+ // Get to be performed on above HBase Table
+ hbase::Get get(row);
+
+ // Create Configuration
+ hbase::HBaseConfigurationLoader loader;
+ auto conf = loader.LoadDefaultResources();
+
+ // Create a client
+ hbase::Client client(conf.value());
+
+ // Get connection to HBase Table
+ auto table = client.Table(tn);
+ ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+ // Perform the Get
+ auto result = table->Get(get);
+ ASSERT_TRUE(result->IsEmpty()) << "Result should be empty.";
+
+ // Stopping the connection as we are getting segfault due to some folly issue
+ // The connection stays open and we don't want that.
+ // So we are stopping the connection.
+ // We can remove this once we have fixed the folly part
+ delete test_util;
+
+ table->Close();
+ client.Close();
+}
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 0389b24..6eb3d8f 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -20,27 +20,49 @@
#include "core/client.h"
#include <glog/logging.h>
-
-#include <unistd.h>
-#include <string>
-
-using namespace folly;
-using namespace std;
-using namespace hbase::pb;
+#include <exception>
+#include <utility>
namespace hbase {
-Client::Client(std::string zk_quorum)
- : cpu_executor_(std::make_shared<wangle::CPUThreadPoolExecutor>(4)),
- io_executor_(std::make_shared<wangle::IOThreadPoolExecutor>(
- sysconf(_SC_NPROCESSORS_ONLN))),
- location_cache_(zk_quorum, cpu_executor_, io_executor_) {}
+Client::Client() {
+ HBaseConfigurationLoader loader;
+ auto conf = loader.LoadDefaultResources();
+ if (!conf) {
+ LOG(ERROR) << "Unable to create default Configuration object. Either hbase-default.xml or "
+ "hbase-site.xml is absent in the search path or problems in XML parsing";
+ throw std::runtime_error("Configuration object not present.");
+ }
+ conf_ = std::make_shared<hbase::Configuration>(conf.value());
+ auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
+ location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_, io_executor_);
+}
+
+Client::Client(const hbase::Configuration &conf) {
+ conf_ = std::make_shared<hbase::Configuration>(conf);
+ auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
+ location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_, io_executor_);
+}
// We can't have the threads continue running after everything is done
// that leads to an error.
Client::~Client() {
cpu_executor_->stop();
io_executor_->stop();
+ if (rpc_client_.get()) rpc_client_->Close();
+}
+
+std::unique_ptr<hbase::Table> Client::Table(const TableName &table_name) {
+ return std::make_unique<hbase::Table>(table_name, location_cache_, rpc_client_, conf_);
+}
+
+void Client::Close() {
+ if (is_closed_) return;
+
+ cpu_executor_->stop();
+ io_executor_->stop();
+ if (rpc_client_.get()) rpc_client_->Close();
+ is_closed_ = true;
}
} // namespace hbase
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 0ba1276..2bb506b 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -27,11 +27,18 @@
#include <memory>
#include <string>
+#include "core/configuration.h"
+#include "core/hbase_configuration_loader.h"
#include "core/location-cache.h"
+#include "connection/rpc-client.h"
+#include "core/table.h"
+#include "serde/table-name.h"
#include "if/Cell.pb.h"
-namespace hbase {
+using hbase::pb::TableName;
+namespace hbase {
+class Table;
/**
* Client.
*
@@ -42,16 +49,34 @@ namespace hbase {
class Client {
public:
/**
- * Create a new client.
+ * @brief Create a new client.
* @param quorum_spec Where to connect to get Zookeeper bootstrap information.
*/
- explicit Client(std::string quorum_spec);
+ Client();
+ explicit Client(const hbase::Configuration &conf);
~Client();
+ /**
+ * @brief Retrieve a Table implementation for accessing a table.
+ * @param - table_name
+ */
+ std::unique_ptr<hbase::Table> Table(const TableName &table_name);
+
+ /**
+ * @brief Close the Client connection.
+ */
+ void Close();
private:
- std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
- std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
- LocationCache location_cache_;
+ const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
+ const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181";
+ std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_ =
+ std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+ std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_ =
+ std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
+ std::shared_ptr<hbase::LocationCache> location_cache_;
+ std::shared_ptr<hbase::RpcClient> rpc_client_ = std::make_shared<hbase::RpcClient>();
+ std::shared_ptr<hbase::Configuration> conf_;
+ bool is_closed_ = false;
};
} // namespace hbase
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
new file mode 100644
index 0000000..58125f9
--- /dev/null
+++ b/hbase-native-client/core/table.cc
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/table.h"
+
+#include <folly/futures/Future.h>
+#include <chrono>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "core/request_converter.h"
+#include "core/response_converter.h"
+#include "if/Client.pb.h"
+#include "security/user.h"
+#include "serde/server-name.h"
+
+using folly::Future;
+using hbase::pb::TableName;
+using hbase::security::User;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+Table::Table(const TableName &table_name,
+ const std::shared_ptr<hbase::LocationCache> &location_cache,
+ const std::shared_ptr<hbase::RpcClient> &rpc_client,
+ const std::shared_ptr<hbase::Configuration> &conf)
+ : table_name_(std::make_shared<TableName>(table_name)),
+ location_cache_(location_cache),
+ rpc_client_(rpc_client),
+ conf_(conf) {
+ client_retries_ = (conf_) ? conf_->GetInt("hbase.client.retries", client_retries_) : 5;
+}
+
+Table::~Table() {}
+
+std::unique_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
+ auto loc = location_cache_->LocateFromMeta(*table_name_, get.Row()).get(milliseconds(1000));
+ auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name());
+ auto user = User::defaultUser(); // TODO: make User::current() similar to UserUtil
+
+ Future<Response> f =
+ rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
+ std::move(req), user, "ClientService");
+ auto resp = f.get();
+
+ return hbase::ResponseConverter::FromGetResponse(resp);
+}
+
+void Table::Close() {
+ if (is_closed_) return;
+
+ if (rpc_client_.get()) rpc_client_->Close();
+ is_closed_ = true;
+}
+
+} /* namespace hbase */
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
new file mode 100644
index 0000000..0e98cd2
--- /dev/null
+++ b/hbase-native-client/core/table.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "connection/rpc-client.h"
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/get.h"
+#include "core/location-cache.h"
+#include "core/result.h"
+#include "serde/table-name.h"
+
+using hbase::pb::TableName;
+
+namespace hbase {
+class Client;
+
+class Table {
+ public:
+ /**
+ * Constructors
+ */
+ Table(const TableName &table_name, const std::shared_ptr<hbase::LocationCache> &location_cache,
+ const std::shared_ptr<hbase::RpcClient> &rpc_client,
+ const std::shared_ptr<hbase::Configuration> &conf);
+ ~Table();
+
+ /**
+ * @brief - Returns a Result object for the constructed Get.
+ * @param - get Get object to perform HBase Get operation.
+ */
+ std::unique_ptr<hbase::Result> Get(const hbase::Get &get);
+
+ /**
+ * @brief - Close the client connection.
+ */
+ void Close();
+
+ private:
+ std::shared_ptr<TableName> table_name_;
+ std::shared_ptr<hbase::LocationCache> location_cache_;
+ std::shared_ptr<hbase::RpcClient> rpc_client_;
+ std::shared_ptr<hbase::Configuration> conf_;
+ bool is_closed_ = false;
+ // default 5 retries. over-ridden in constructor.
+ int client_retries_ = 5;
+};
+} /* namespace hbase */