You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/03/29 23:09:29 UTC
hbase git commit: HBASE-17771 [C++] Classes required for
implementation of BatchCallerBuilder
Repository: hbase
Updated Branches:
refs/heads/HBASE-14850 915d89f51 -> 8c7a8b9da
HBASE-17771 [C++] Classes required for implementation of BatchCallerBuilder
Signed-off-by: Enis Soztutar <en...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8c7a8b9d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8c7a8b9d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8c7a8b9d
Branch: refs/heads/HBASE-14850
Commit: 8c7a8b9da24a69810f2e18fdaae551b9d017c8cc
Parents: 915d89f
Author: Sudeep Sunthankar <su...@hashmapinc.com>
Authored: Wed Mar 29 16:55:21 2017 +1100
Committer: Enis Soztutar <en...@apache.org>
Committed: Wed Mar 29 16:08:55 2017 -0700
----------------------------------------------------------------------
hbase-native-client/connection/request.cc | 4 +
hbase-native-client/connection/request.h | 2 +
hbase-native-client/core/BUCK | 8 ++
hbase-native-client/core/action.h | 45 ++++++++++
hbase-native-client/core/get-test.cc | 5 +-
hbase-native-client/core/get.cc | 15 +---
hbase-native-client/core/get.h | 18 +---
hbase-native-client/core/multi-response.cc | 80 ++++++++++++++++++
hbase-native-client/core/multi-response.h | 81 ++++++++++++++++++
hbase-native-client/core/raw-async-table.cc | 2 +-
hbase-native-client/core/region-request.h | 48 +++++++++++
hbase-native-client/core/region-result.cc | 54 ++++++++++++
hbase-native-client/core/region-result.h | 55 ++++++++++++
hbase-native-client/core/request-converter.cc | 82 ++++++++++++------
hbase-native-client/core/request-converter.h | 12 +++
hbase-native-client/core/response-converter.cc | 94 ++++++++++++++++++++-
hbase-native-client/core/response-converter.h | 7 ++
hbase-native-client/core/row.h | 62 ++++++++++++++
hbase-native-client/core/server-request.h | 59 +++++++++++++
19 files changed, 671 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/connection/request.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/request.cc b/hbase-native-client/connection/request.cc
index 189130e..80883cc 100644
--- a/hbase-native-client/connection/request.cc
+++ b/hbase-native-client/connection/request.cc
@@ -39,3 +39,7 @@ std::unique_ptr<Request> Request::scan() {
return std::make_unique<Request>(std::make_shared<hbase::pb::ScanRequest>(),
std::make_shared<hbase::pb::ScanResponse>(), "Scan");
}
+std::unique_ptr<Request> Request::multi() {
+ return std::make_unique<Request>(std::make_shared<hbase::pb::MultiRequest>(),
+ std::make_shared<hbase::pb::MultiResponse>(), "Multi");
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/connection/request.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/request.h b/hbase-native-client/connection/request.h
index 91c684d..520b380 100644
--- a/hbase-native-client/connection/request.h
+++ b/hbase-native-client/connection/request.h
@@ -39,6 +39,8 @@ class Request {
static std::unique_ptr<Request> mutate();
/** Create a request object for a scan */
static std::unique_ptr<Request> scan();
+ /** Create a request object for a multi */
+ static std::unique_ptr<Request> multi();
/**
* This should be private. Do not use this.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 2d77f2d..7483980 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -45,6 +45,12 @@ cxx_library(
"async-rpc-retrying-caller.h",
"hbase-rpc-controller.h",
"zk-util.h",
+ "action.h",
+ "multi-response.h",
+ "region-request.h",
+ "region-result.h",
+ "row.h",
+ "server-request.h",
],
srcs=[
"async-connection.cc",
@@ -62,6 +68,8 @@ cxx_library(
"response-converter.cc",
"table.cc",
"zk-util.cc",
+ "multi-response.cc",
+ "region-result.cc",
],
deps=[
"//exceptions:exceptions",
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/action.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h
new file mode 100644
index 0000000..3511683
--- /dev/null
+++ b/hbase-native-client/core/action.h
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include "core/row.h"
+
+using hbase::Row;
+namespace hbase {
+
+class Action {
+ public:
+ Action(std::shared_ptr<Row> action, int original_index)
+ : action_(action), original_index_(original_index) {}
+ ~Action() {}
+
+ int64_t original_index() const { return original_index_; }
+
+ std::shared_ptr<Row> action() const { return action_; }
+
+ private:
+ std::shared_ptr<Row> action_;
+ int64_t original_index_;
+ int64_t nonce_ = -1;
+ int32_t replica_id_ = -1;
+};
+
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/get-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/get-test.cc b/hbase-native-client/core/get-test.cc
index 07d0003..6ee2715 100644
--- a/hbase-native-client/core/get-test.cc
+++ b/hbase-native-client/core/get-test.cc
@@ -21,7 +21,8 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-using namespace hbase;
+
+using hbase::Get;
const int NUMBER_OF_GETS = 5;
void CheckFamilies(Get &get) {
@@ -102,7 +103,7 @@ void CheckFamiliesAfterCopy(Get &get) {
}
void GetMethods(Get &get, const std::string &row) {
- EXPECT_EQ(row, get.Row());
+ EXPECT_EQ(row, get.row());
CheckFamilies(get);
EXPECT_EQ(true, get.CacheBlocks());
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/get.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/get.cc b/hbase-native-client/core/get.cc
index 5c5f446..afeb429 100644
--- a/hbase-native-client/core/get.cc
+++ b/hbase-native-client/core/get.cc
@@ -26,7 +26,7 @@ namespace hbase {
Get::~Get() {}
-Get::Get(const std::string &row) : row_(row) { CheckRow(row_); }
+Get::Get(const std::string &row) : Row(row) {}
Get::Get(const Get &get) {
row_ = get.row_;
@@ -78,8 +78,6 @@ Get &Get::AddColumn(const std::string &family, const std::string &qualifier) {
return *this;
}
-const std::string &Get::Row() const { return row_; }
-
hbase::pb::Consistency Get::Consistency() const { return consistency_; }
Get &Get::SetConsistency(hbase::pb::Consistency consistency) {
@@ -119,15 +117,4 @@ Get &Get::SetTimeStamp(int64_t timestamp) {
const TimeRange &Get::Timerange() const { return *tr_; }
-void Get::CheckRow(const std::string &row) {
- const int kMaxRowLength = std::numeric_limits<int16_t>::max();
- int row_length = row.size();
- if (0 == row_length) {
- throw std::runtime_error("Row length can't be 0");
- }
- if (row_length > kMaxRowLength) {
- throw std::runtime_error("Length of " + row + " is greater than max row size: " +
- std::to_string(kMaxRowLength));
- }
-}
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/get.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/get.h b/hbase-native-client/core/get.h
index 5492f21..e0be4e7 100644
--- a/hbase-native-client/core/get.h
+++ b/hbase-native-client/core/get.h
@@ -25,9 +25,11 @@
#include <string>
#include <vector>
#include "core/query.h"
+#include "core/row.h"
#include "core/time-range.h"
#include "if/Client.pb.h"
+using hbase::Row;
namespace hbase {
/**
@@ -36,7 +38,7 @@ namespace hbase {
*/
using FamilyMap = std::map<std::string, std::vector<std::string>>;
-class Get : public Query {
+class Get : public Row, public Query {
public:
/**
* Constructors
@@ -110,11 +112,6 @@ class Get : public Query {
Get& AddColumn(const std::string& family, const std::string& qualifier);
/**
- * @brief Returns the row for this Get operation
- */
- const std::string& Row() const;
-
- /**
* @brief Returns true if family map (FamilyMap) is non empty false otherwise
*/
bool HasFamilies() const;
@@ -131,21 +128,12 @@ class Get : public Query {
Get& SetConsistency(hbase::pb::Consistency consistency);
private:
- std::string row_ = "";
int32_t max_versions_ = 1;
bool cache_blocks_ = true;
bool check_existence_only_ = false;
FamilyMap family_map_;
hbase::pb::Consistency consistency_ = hbase::pb::Consistency::STRONG;
std::unique_ptr<TimeRange> tr_ = std::make_unique<TimeRange>();
-
- /**
- * @brief Checks if the row for this Get operation is proper or not
- * @param row Row to check
- * @throws std::runtime_error if row is empty or greater than
- * MAX_ROW_LENGTH(i.e. std::numeric_limits<short>::max())
- */
- void CheckRow(const std::string& row);
};
} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/multi-response.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/multi-response.cc b/hbase-native-client/core/multi-response.cc
new file mode 100644
index 0000000..562f3b6
--- /dev/null
+++ b/hbase-native-client/core/multi-response.cc
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/multi-response.h"
+#include "core/region-result.h"
+
+namespace hbase {
+
+MultiResponse::MultiResponse() {}
+
+int MultiResponse::Size() const {
+ int size = 0;
+ for (const auto& result : results_) {
+ size += result.second->ResultOrExceptionSize();
+ }
+ return size;
+}
+
+void MultiResponse::AddRegionResult(const std::string& region_name, int32_t original_index,
+ std::shared_ptr<Result> result,
+ std::shared_ptr<std::exception> exc) {
+ bool region_found = false;
+ for (auto itr = results_.begin(); itr != results_.end(); ++itr) {
+ if (itr->first == region_name) {
+ region_found = true;
+ itr->second->AddResultOrException(original_index, result, exc);
+ break;
+ }
+ }
+ if (!region_found) {
+ auto region_result = std::make_shared<RegionResult>();
+ region_result->AddResultOrException(original_index, result, exc);
+ results_[region_name] = region_result;
+ }
+}
+
+void MultiResponse::AddRegionException(const std::string& region_name,
+ std::shared_ptr<std::exception> exception) {
+ exceptions_[region_name] = exception;
+}
+
+std::shared_ptr<std::exception> MultiResponse::RegionException(
+ const std::string& region_name) const {
+ auto find = exceptions_.at(region_name);
+ return find;
+}
+
+const std::map<std::string, std::shared_ptr<std::exception> >& MultiResponse::RegionExceptions()
+ const {
+ return exceptions_;
+}
+
+void MultiResponse::AddStatistic(const std::string& region_name,
+ std::shared_ptr<RegionLoadStats> stat) {
+ results_[region_name]->set_stat(stat);
+}
+
+const std::map<std::string, std::shared_ptr<RegionResult> >& MultiResponse::RegionResults() const {
+ return results_;
+}
+
+MultiResponse::~MultiResponse() {}
+
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/multi-response.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/multi-response.h b/hbase-native-client/core/multi-response.h
new file mode 100644
index 0000000..cebd2b7
--- /dev/null
+++ b/hbase-native-client/core/multi-response.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <core/region-result.h>
+#include <exception>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "core/result.h"
+#include "if/Client.pb.h"
+
+using hbase::RegionResult;
+using hbase::Result;
+using hbase::pb::RegionLoadStats;
+
+namespace hbase {
+
+class MultiResponse {
+ public:
+ MultiResponse();
+ /**
+ * @brief Returns Number of pairs in this container
+ */
+ int Size() const;
+
+ /**
+ * Add the pair to the container, grouped by the regionName
+ *
+ * @param regionName
+ * @param originalIndex the original index of the Action (request).
+ * @param resOrEx the result or error; will be empty for successful Put and Delete actions.
+ */
+ void AddRegionResult(const std::string& region_name, int32_t original_index,
+ std::shared_ptr<Result> result, std::shared_ptr<std::exception> exc);
+
+ void AddRegionException(const std::string& region_name,
+ std::shared_ptr<std::exception> exception);
+
+ /**
+ * @return the exception for the region, if any. Null otherwise.
+ */
+ std::shared_ptr<std::exception> RegionException(const std::string& region_name) const;
+
+ const std::map<std::string, std::shared_ptr<std::exception>>& RegionExceptions() const;
+
+ void AddStatistic(const std::string& region_name, std::shared_ptr<RegionLoadStats> stat);
+
+ const std::map<std::string, std::shared_ptr<RegionResult>>& RegionResults() const;
+
+ ~MultiResponse();
+
+ private:
+ // map of regionName to map of Results by the original index for that Result
+ std::map<std::string, std::shared_ptr<hbase::RegionResult>> results_;
+ /**
+ * The server can send us a failure for the region itself, instead of individual failure.
+ * It's a part of the protobuf definition.
+ */
+ std::map<std::string, std::shared_ptr<std::exception>> exceptions_;
+};
+
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/raw-async-table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index 88a3382..9a680ed 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -56,7 +56,7 @@ folly::Future<RESP> RawAsyncTable::Call(
Future<std::shared_ptr<Result>> RawAsyncTable::Get(const hbase::Get& get) {
auto caller =
- CreateCallerBuilder<std::shared_ptr<Result>>(get.Row(), connection_conf_->read_rpc_timeout())
+ CreateCallerBuilder<std::shared_ptr<Result>>(get.row(), connection_conf_->read_rpc_timeout())
->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller,
std::shared_ptr<hbase::RegionLocation> loc,
std::shared_ptr<hbase::RpcClient> rpc_client)
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/region-request.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-request.h b/hbase-native-client/core/region-request.h
new file mode 100644
index 0000000..6f29d44
--- /dev/null
+++ b/hbase-native-client/core/region-request.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+#include <memory>
+#include <queue>
+#include <vector>
+#include "core/action.h"
+#include "core/region-location.h"
+
+using hbase::Action;
+namespace hbase {
+
+class RegionRequest {
+ public:
+ // Concurrent
+ using ActionList = std::vector<std::shared_ptr<Action>>;
+ explicit RegionRequest(const std::shared_ptr<hbase::RegionLocation> ®ion_loc)
+ : region_loc_(region_loc) {}
+ ~RegionRequest() {}
+ void AddAction(std::shared_ptr<Action> action) {
+ actions_.push_back(action);
+ }
+ std::shared_ptr<hbase::RegionLocation> region_location() const { return region_loc_; }
+ const ActionList &actions() const { return actions_; }
+
+ private:
+ std::shared_ptr<hbase::RegionLocation> region_loc_;
+ ActionList actions_;
+};
+
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/region-result.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-result.cc b/hbase-native-client/core/region-result.cc
new file mode 100644
index 0000000..d9ab942
--- /dev/null
+++ b/hbase-native-client/core/region-result.cc
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/region-result.h"
+#include <glog/logging.h>
+#include <stdexcept>
+
+using hbase::Result;
+using hbase::pb::RegionLoadStats;
+
+namespace hbase {
+
+RegionResult::RegionResult() {}
+
+RegionResult::~RegionResult() {}
+
+void RegionResult::AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result,
+ std::shared_ptr<std::exception> exc) {
+ auto index_found = result_or_excption_.find(index);
+ if (index_found == result_or_excption_.end()) {
+ result_or_excption_[index] = std::make_tuple(result ? result : nullptr, exc ? exc : nullptr);
+ } else {
+ throw std::runtime_error("Index " + std::to_string(index) +
+ " already set with ResultOrException");
+ }
+}
+
+void RegionResult::set_stat(std::shared_ptr<RegionLoadStats> stat) { stat_ = stat; }
+
+int RegionResult::ResultOrExceptionSize() const { return result_or_excption_.size(); }
+
+std::shared_ptr<ResultOrExceptionTuple> RegionResult::ResultOrException(int32_t index) const {
+ return std::make_shared<ResultOrExceptionTuple>(result_or_excption_.at(index));
+}
+
+const std::shared_ptr<RegionLoadStats>& RegionResult::stat() const { return stat_; }
+
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/region-result.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-result.h b/hbase-native-client/core/region-result.h
new file mode 100644
index 0000000..9b7ca03
--- /dev/null
+++ b/hbase-native-client/core/region-result.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <tuple>
+#include "core/result.h"
+#include "if/Client.pb.h"
+
+using hbase::Result;
+using hbase::pb::RegionLoadStats;
+
+namespace hbase {
+using ResultOrExceptionTuple =
+ std::tuple<std::shared_ptr<hbase::Result>, std::shared_ptr<std::exception>>;
+class RegionResult {
+ public:
+ RegionResult();
+ void AddResultOrException(int32_t index, std::shared_ptr<hbase::Result> result,
+ std::shared_ptr<std::exception> exc);
+
+ void set_stat(std::shared_ptr<RegionLoadStats> stat);
+
+ int ResultOrExceptionSize() const;
+
+ std::shared_ptr<ResultOrExceptionTuple> ResultOrException(int32_t index) const;
+
+ const std::shared_ptr<RegionLoadStats>& stat() const;
+
+ ~RegionResult();
+
+ private:
+ std::map<int, ResultOrExceptionTuple> result_or_excption_;
+ std::shared_ptr<RegionLoadStats> stat_;
+};
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/request-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index 227e04a..ff92b5c 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -23,6 +23,7 @@
using hbase::Request;
using hbase::pb::GetRequest;
+using hbase::pb::RegionAction;
using hbase::pb::RegionSpecifier;
using hbase::pb::RegionSpecifier_RegionSpecifierType;
using hbase::pb::ScanRequest;
@@ -43,35 +44,9 @@ void RequestConverter::SetRegion(const std::string ®ion_name,
std::unique_ptr<Request> RequestConverter::ToGetRequest(const Get &get,
const std::string ®ion_name) {
auto pb_req = Request::get();
-
auto pb_msg = std::static_pointer_cast<GetRequest>(pb_req->req_msg());
RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
-
- auto pb_get = pb_msg->mutable_get();
- pb_get->set_max_versions(get.MaxVersions());
- pb_get->set_cache_blocks(get.CacheBlocks());
- pb_get->set_consistency(get.Consistency());
-
- if (!get.Timerange().IsAllTime()) {
- hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range();
- pb_time_range->set_from(get.Timerange().MinTimeStamp());
- pb_time_range->set_to(get.Timerange().MaxTimeStamp());
- }
- pb_get->set_row(get.Row());
- if (get.HasFamilies()) {
- for (const auto &family : get.Family()) {
- auto column = pb_get->add_column();
- column->set_family(family.first);
- for (const auto &qualifier : family.second) {
- column->add_qualifier(qualifier);
- }
- }
- }
-
- if (get.filter() != nullptr) {
- pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release());
- }
-
+ pb_msg->set_allocated_get((RequestConverter::ToGet(get)).release());
return pb_req;
}
@@ -123,4 +98,57 @@ std::unique_ptr<Request> RequestConverter::ToScanRequest(const Scan &scan,
return pb_req;
}
+
+std::unique_ptr<Request> RequestConverter::ToMultiRequest(
+ const ActionsByRegion &actions_by_region) {
+ auto pb_req = Request::multi();
+ auto pb_msg = std::static_pointer_cast<hbase::pb::MultiRequest>(pb_req->req_msg());
+
+ for (const auto &action_by_region : actions_by_region) {
+ auto pb_region_action = pb_msg->add_regionaction();
+ RequestConverter::SetRegion(action_by_region.first, pb_region_action->mutable_region());
+ int action_num = 0;
+ for (const auto ®ion_action : action_by_region.second->actions()) {
+ auto pb_action = pb_region_action->add_action();
+ auto action = region_action->action();
+ if (auto pget = std::dynamic_pointer_cast<Get>(action)) {
+ auto pb_get = RequestConverter::ToGet(*pget.get());
+ pb_action->set_allocated_get(pb_get.release());
+ pb_action->set_index(action_num);
+ }
+ action_num++;
+ }
+ }
+
+ VLOG(3) << "Multi Req:-" << pb_req->req_msg()->ShortDebugString();
+ return pb_req;
+}
+
+std::unique_ptr<hbase::pb::Get> RequestConverter::ToGet(const Get &get) {
+ auto pb_get = std::make_unique<hbase::pb::Get>();
+ pb_get->set_max_versions(get.MaxVersions());
+ pb_get->set_cache_blocks(get.CacheBlocks());
+ pb_get->set_consistency(get.Consistency());
+
+ if (!get.Timerange().IsAllTime()) {
+ hbase::pb::TimeRange *pb_time_range = pb_get->mutable_time_range();
+ pb_time_range->set_from(get.Timerange().MinTimeStamp());
+ pb_time_range->set_to(get.Timerange().MaxTimeStamp());
+ }
+ pb_get->set_row(get.row());
+ if (get.HasFamilies()) {
+ for (const auto &family : get.Family()) {
+ auto column = pb_get->add_column();
+ column->set_family(family.first);
+ for (const auto &qualifier : family.second) {
+ column->add_qualifier(qualifier);
+ }
+ }
+ }
+
+ if (get.filter() != nullptr) {
+ pb_get->set_allocated_filter(Filter::ToProto(*(get.filter())).release());
+ }
+ return pb_get;
+}
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/request-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h
index 57f08cc..003afaa 100644
--- a/hbase-native-client/core/request-converter.h
+++ b/hbase-native-client/core/request-converter.h
@@ -21,14 +21,23 @@
#include <memory>
#include <string>
+#include <vector>
#include "connection/request.h"
+#include "core/action.h"
#include "core/get.h"
+#include "core/region-request.h"
#include "core/scan.h"
+#include "core/server-request.h"
#include "if/HBase.pb.h"
using hbase::pb::RegionSpecifier;
+using hbase::pb::RegionAction;
+using hbase::pb::ServerName;
+using hbase::ServerRequest;
+
namespace hbase {
+using ActionsByRegion = ServerRequest::ActionsByRegion;
/**
* RequestConverter class
* This class converts a Client side Get, Scan, Mutate operation to corresponding PB message.
@@ -53,6 +62,8 @@ class RequestConverter {
*/
static std::unique_ptr<Request> ToScanRequest(const Scan &scan, const std::string ®ion_name);
+ static std::unique_ptr<Request> ToMultiRequest(const ActionsByRegion ®ion_requests);
+
private:
// Constructor not required. We have all static methods to create PB requests.
RequestConverter();
@@ -64,6 +75,7 @@ class RequestConverter {
* Request.
*/
static void SetRegion(const std::string ®ion_name, RegionSpecifier *region_specifier);
+ static std::unique_ptr<hbase::pb::Get> ToGet(const Get &get);
};
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/response-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index b2fff34..7729257 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -19,13 +19,18 @@
#include "core/response-converter.h"
+#include <glog/logging.h>
+#include <stdexcept>
#include <string>
+#include <utility>
#include <vector>
-
#include "core/cell.h"
+#include "core/multi-response.h"
+#include "exceptions/exception.h"
using hbase::pb::GetResponse;
using hbase::pb::ScanResponse;
+using hbase::pb::RegionLoadStats;
namespace hbase {
@@ -37,6 +42,7 @@ ResponseConverter::~ResponseConverter() {}
// go inside folly::Future's, making the move semantics extremely tricky.
std::shared_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) {
auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg());
+ VLOG(3) << "FromGetResponse:" << get_resp->ShortDebugString();
return ToResult(get_resp->result(), resp.cell_scanner());
}
@@ -52,16 +58,24 @@ std::shared_ptr<Result> ResponseConverter::ToResult(
// iterate over the cells coming from rpc codec
if (cell_scanner != nullptr) {
- while (cell_scanner->Advance()) {
+ int cells_read = 0;
+ while (cells_read != result.associated_cell_count()) {
+ if (cell_scanner->Advance()) {
vcells.push_back(cell_scanner->Current());
+ cells_read += 1;
+ } else {
+ LOG(ERROR)<< "CellScanner::Advance() returned false unexpectedly. Cells Read:- "
+ << cells_read << "; Expected Cell Count:- " << result.associated_cell_count();
+ std::runtime_error("CellScanner::Advance() returned false unexpectedly");
+ }
}
- // TODO: check associated cell count?
}
return std::make_shared<Result>(vcells, result.exists(), result.stale(), result.partial());
}
std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) {
auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg());
+ VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString();
int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size()
: scan_resp->results_size();
@@ -94,4 +108,78 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const R
return results;
}
+
+std::unique_ptr<hbase::MultiResponse> ResponseConverter::GetResults(std::shared_ptr<Request> req,
+ const Response& resp) {
+ auto multi_req = std::static_pointer_cast < hbase::pb::MultiRequest > (req->req_msg());
+ auto multi_resp = std::static_pointer_cast < hbase::pb::MultiResponse > (resp.resp_msg());
+ VLOG(3) << "GetResults:" << multi_resp->ShortDebugString();
+ int req_region_action_count = multi_req->regionaction_size();
+ int res_region_action_count = multi_resp->regionactionresult_size();
+ if (req_region_action_count != res_region_action_count) {
+ throw std::runtime_error(
+ "Request mutation count=" + std::to_string(req_region_action_count)
+ + " does not match response mutation result count="
+ + std::to_string(res_region_action_count));
+ }
+ auto multi_response = std::make_unique<hbase::MultiResponse>();
+ for (int32_t num = 0; num < res_region_action_count; num++) {
+ hbase::pb::RegionAction actions = multi_req->regionaction(num);
+ hbase::pb::RegionActionResult action_result = multi_resp->regionactionresult(num);
+ hbase::pb::RegionSpecifier rs = actions.region();
+ if (rs.has_type() && rs.type() != hbase::pb::RegionSpecifier::REGION_NAME) {
+ throw std::runtime_error("We support only encoded types for protobuf multi response.");
+ }
+
+ auto region_name = rs.value();
+ if (action_result.has_exception()) {
+ if (action_result.exception().has_value()) {
+ auto exc = std::make_shared < hbase::IOException > (action_result.exception().value());
+ VLOG(8) << "Store Region Exception:- " << exc->what();
+ multi_response->AddRegionException(region_name, exc);
+ }
+ continue;
+ }
+
+ if (actions.action_size() != action_result.resultorexception_size()) {
+ throw std::runtime_error(
+ "actions.action_size=" + std::to_string(actions.action_size())
+ + ", action_result.resultorexception_size="
+ + std::to_string(action_result.resultorexception_size()) + " for region "
+ + actions.region().value());
+ }
+
+ for (hbase::pb::ResultOrException roe : action_result.resultorexception()) {
+ std::shared_ptr < Result > result;
+ std::shared_ptr < std::exception > exc;
+ if (roe.has_exception()) {
+ if (roe.exception().has_value()) {
+ exc = std::make_shared < hbase::IOException > (roe.exception().value());
+ VLOG(8) << "Store ResultOrException:- " << exc->what();
+ }
+ } else if (roe.has_result()) {
+ result = ToResult(roe.result(), resp.cell_scanner());
+ } else if (roe.has_service_result()) {
+ // TODO Not processing Coprocessor Service Result;
+ } else {
+ // Sometimes, the response is just "it was processed". Generally, this occurs for things
+ // like mutateRows where either we get back 'processed' (or not) and optionally some
+ // statistics about the regions we touched.
+ std::vector < std::shared_ptr < Cell >> empty_cells;
+ result = std::make_shared < Result
+ > (empty_cells, multi_resp->processed() ? true : false, false, false);
+ }
+ multi_response->AddRegionResult(region_name, roe.index(), std::move(result), exc);
+ }
+ }
+
+ if (multi_resp->has_regionstatistics()) {
+ hbase::pb::MultiRegionLoadStats stats = multi_resp->regionstatistics();
+ for (int i = 0; i < stats.region_size(); i++) {
+ multi_response->AddStatistic(stats.region(i).value(),
+ std::make_shared < RegionLoadStats > (stats.stat(i)));
+ }
+ }
+ return multi_response;
+}
} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/response-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h
index 743c14b..a5095fd 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -21,11 +21,15 @@
#include <memory>
#include <vector>
+#include "connection/request.h"
#include "connection/response.h"
+#include "core/multi-response.h"
#include "core/result.h"
#include "if/Client.pb.h"
#include "serde/cell-scanner.h"
+using hbase::Request;
+using hbase::Response;
namespace hbase {
/**
@@ -47,6 +51,9 @@ class ResponseConverter {
static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response& resp);
+ static std::unique_ptr<hbase::MultiResponse> GetResults(std::shared_ptr<Request> req,
+ const Response& resp);
+
private:
// Constructor not required. We have all static methods to extract response from PB messages.
ResponseConverter();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/row.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/row.h b/hbase-native-client/core/row.h
new file mode 100644
index 0000000..2c7bdd1
--- /dev/null
+++ b/hbase-native-client/core/row.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <limits>
+#include <stdexcept>
+#include <string>
+
+#pragma once
+
+namespace hbase {
+
+class Row {
+ public:
+ Row() {}
+ explicit Row(const std::string &row) : row_(row) { CheckRow(row_); }
+
+ /**
+ * @brief Returns the row for the Row interface.
+ */
+ const std::string &row() const { return row_; }
+ virtual ~Row() {}
+
+ private:
+ /**
+ * @brief Checks if the row for this Get operation is proper or not
+ * @param row Row to check
+ * @throws std::runtime_error if row is empty or greater than
+ * MAX_ROW_LENGTH(i.e. std::numeric_limits<short>::max())
+ */
+ void CheckRow(const std::string &row) {
+ const int16_t kMaxRowLength = std::numeric_limits<int16_t>::max();
+ size_t row_length = row.size();
+ if (0 == row_length) {
+ throw std::runtime_error("Row length can't be 0");
+ }
+ if (row_length > kMaxRowLength) {
+ throw std::runtime_error("Length of " + row + " is greater than max row size: " +
+ std::to_string(kMaxRowLength));
+ }
+ }
+
+ protected:
+ std::string row_ = "";
+};
+
+} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c7a8b9d/hbase-native-client/core/server-request.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/server-request.h b/hbase-native-client/core/server-request.h
new file mode 100644
index 0000000..827b2e7
--- /dev/null
+++ b/hbase-native-client/core/server-request.h
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include "core/action.h"
+#include "core/region-location.h"
+#include "core/region-request.h"
+
+using hbase::Action;
+using hbase::RegionRequest;
+
+namespace hbase {
+
+class ServerRequest {
+ public:
+ // Concurrent
+ using ActionsByRegion = std::map<std::string, std::shared_ptr<RegionRequest>>;
+
+ explicit ServerRequest(std::shared_ptr<RegionLocation> region_location) {
+ auto region_name = region_location->region_name();
+ auto region_request = std::make_shared<RegionRequest>(region_location);
+ actions_by_region_[region_name] = region_request;
+ }
+ ~ServerRequest() {}
+
+ void AddActionsByRegion(std::shared_ptr<RegionLocation> region_location,
+ std::shared_ptr<Action> action) {
+ auto region_name = region_location->region_name();
+ auto itr = actions_by_region_.at(region_name);
+ itr->AddAction(action);
+ }
+
+ const ActionsByRegion &actions_by_region() const { return actions_by_region_; }
+
+ private:
+ ActionsByRegion actions_by_region_;
+};
+} /* namespace hbase */