You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2017/09/15 21:20:27 UTC
[10/25] hbase git commit: HBASE-18725 [C++] Install header files as
well as library
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/security/user.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/security/user.h b/hbase-native-client/security/user.h
deleted file mode 100644
index 307fc61..0000000
--- a/hbase-native-client/security/user.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <glog/logging.h>
-#include <mutex>
-#include <string>
-#include "core/configuration.h"
-
-namespace hbase {
-namespace security {
-static constexpr const char* kKerberos = "kerberos";
-class User {
- public:
- explicit User(const std::string& user_name) : user_name_(user_name) {}
- virtual ~User() = default;
-
- std::string user_name() { return user_name_; }
-
- static std::shared_ptr<User> defaultUser() { return std::make_shared<User>("__drwho"); }
-
- static bool IsSecurityEnabled(const Configuration& conf) {
- return conf.Get("hbase.security.authentication", "").compare(kKerberos) == 0;
- }
-
- private:
- std::string user_name_;
-};
-}
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK
deleted file mode 100644
index a765884..0000000
--- a/hbase-native-client/serde/BUCK
+++ /dev/null
@@ -1,96 +0,0 @@
-##
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-cxx_library(
- name="serde",
- exported_headers=[
- "cell-scanner.h",
- "cell-outputstream.h",
- "codec.h",
- "region-info.h",
- "rpc-serde.h",
- "server-name.h",
- "table-name.h",
- "zk.h",
- ],
- srcs=[
- "rpc-serde.cc",
- "zk.cc",
- ],
- deps=[
- "//if:if", "//third-party:folly", "//utils:utils", "//security:security"
- ],
- tests=[
- ":client-deserializer-test",
- ":client-serializer-test",
- ":server-name-test",
- ":table-name-test",
- ":zk-deserializer-test",
- ":region-info-deserializer-test",
- ],
- compiler_flags=['-Weffc++'],
- visibility=[
- 'PUBLIC',
- ],)
-cxx_test(
- name="table-name-test",
- srcs=[
- "table-name-test.cc",
- ],
- deps=[
- ":serde",
- ],)
-cxx_test(
- name="server-name-test",
- srcs=[
- "server-name-test.cc",
- ],
- deps=[
- ":serde",
- ],)
-cxx_test(
- name="client-serializer-test",
- srcs=[
- "client-serializer-test.cc",
- ],
- deps=[
- ":serde",
- ],)
-cxx_test(
- name="client-deserializer-test",
- srcs=[
- "client-deserializer-test.cc",
- ],
- deps=[
- ":serde",
- ],)
-cxx_test(
- name="zk-deserializer-test",
- srcs=[
- "zk-deserializer-test.cc",
- ],
- deps=[
- ":serde",
- ],)
-cxx_test(
- name="region-info-deserializer-test",
- srcs=[
- "region-info-deserializer-test.cc",
- ],
- deps=[
- ":serde",
- ],)
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/cell-outputstream.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/cell-outputstream.h b/hbase-native-client/serde/cell-outputstream.h
deleted file mode 100644
index 963dd31..0000000
--- a/hbase-native-client/serde/cell-outputstream.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <memory>
-
-namespace hbase {
-
-class Cell;
-
-/**
- * @brief Encoder / Decoder for Cells.
- */
-class CellOutputStream {
- public:
- virtual ~CellOutputStream() {}
-
- /**
- * Implementation must copy the entire state of the Cell. If the written Cell is modified
- * immediately after the write method returns, the modifications must have absolutely no effect
- * on the copy of the Cell that was added in the write.
- * @param cell Cell to write out
- * @throws IOException
- */
- virtual void Write(const Cell& cell) = 0;
-
- /**
- * Let the implementation decide what to do. Usually means writing accumulated data into a
- * byte[] that can then be read from the implementation to be sent to disk, put in the block
- * cache, or sent over the network.
- * @throws IOException
- */
- virtual void Flush() = 0;
-};
-
-} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/cell-scanner.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/cell-scanner.h b/hbase-native-client/serde/cell-scanner.h
deleted file mode 100644
index fe4a249..0000000
--- a/hbase-native-client/serde/cell-scanner.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-#include <folly/io/IOBuf.h>
-#include <memory>
-
-namespace hbase {
-
-class Cell;
-
-/**
- * @brief Interface for iterating over a sequence of Cells
- */
-class CellScanner {
- public:
- virtual ~CellScanner() {}
-
- /**
- * @brief This method will be used to iterate the cells.
- * Typical usage will be :-
- * while(cell_scanner.Advance()){
- * auto current_cell = cell_scanner.Current();
- * }
- */
- virtual bool Advance() = 0;
-
- /**
- * @brief returns the current cell
- */
- virtual const std::shared_ptr<Cell> Current() const = 0;
-};
-
-} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/client-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc
deleted file mode 100644
index 1856047..0000000
--- a/hbase-native-client/serde/client-deserializer-test.cc
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <folly/io/IOBuf.h>
-#include <gtest/gtest.h>
-
-#include "if/Client.pb.h"
-#include "rpc-serde.h"
-
-using namespace hbase;
-using folly::IOBuf;
-using hbase::pb::GetRequest;
-using hbase::pb::RegionSpecifier;
-using hbase::pb::RegionSpecifier_RegionSpecifierType;
-
-TEST(TestRpcSerde, TestReturnFalseOnNullPtr) {
- RpcSerde deser{nullptr};
- ASSERT_LT(deser.ParseDelimited(nullptr, nullptr), 0);
-}
-
-TEST(TestRpcSerde, TestReturnFalseOnBadInput) {
- RpcSerde deser{nullptr};
- auto buf = IOBuf::copyBuffer("test");
- GetRequest gr;
-
- ASSERT_LT(deser.ParseDelimited(buf.get(), &gr), 0);
-}
-
-TEST(TestRpcSerde, TestGoodGetRequestFullRoundTrip) {
- GetRequest in;
- RpcSerde ser{nullptr};
- RpcSerde deser{nullptr};
-
- // fill up the GetRequest.
- in.mutable_region()->set_value("test_region_id");
- in.mutable_region()->set_type(
- RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
- in.mutable_get()->set_row("test_row");
-
- // Create the buffer
- auto buf = ser.SerializeDelimited(in);
-
- GetRequest out;
-
- int used_bytes = deser.ParseDelimited(buf.get(), &out);
-
- ASSERT_GT(used_bytes, 0);
- ASSERT_EQ(used_bytes, buf->length());
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/client-serializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc
deleted file mode 100644
index 306f2c2..0000000
--- a/hbase-native-client/serde/client-serializer-test.cc
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <gtest/gtest.h>
-
-#include <folly/io/Cursor.h>
-
-#include <string>
-
-#include "if/HBase.pb.h"
-#include "if/RPC.pb.h"
-#include "rpc-serde.h"
-
-using namespace hbase;
-using namespace hbase::pb;
-using namespace folly;
-using namespace folly::io;
-
-TEST(RpcSerdeTest, PreambleIncludesHBas) {
- RpcSerde ser{nullptr};
- auto buf = ser.Preamble(false);
- const char *p = reinterpret_cast<const char *>(buf->data());
- // Take the first for chars and make sure they are the
- // magic string
- EXPECT_EQ("HBas", std::string(p, 4));
-
- EXPECT_EQ(6, buf->computeChainDataLength());
-}
-
-TEST(RpcSerdeTest, PreambleIncludesVersion) {
- RpcSerde ser{nullptr};
- auto buf = ser.Preamble(false);
- EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]);
- EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]);
-}
-
-TEST(RpcSerdeTest, TestHeaderLengthPrefixed) {
- RpcSerde ser{nullptr};
- auto header = ser.Header("elliott");
-
- // The header should be prefixed by 4 bytes of length.
- EXPECT_EQ(4, header->length());
- EXPECT_TRUE(header->length() < header->computeChainDataLength());
- EXPECT_TRUE(header->isChained());
-
- // Now make sure the length is correct.
- Cursor cursor(header.get());
- auto prefixed_len = cursor.readBE<uint32_t>();
- EXPECT_EQ(prefixed_len, header->next()->length());
-}
-
-TEST(RpcSerdeTest, TestHeaderDecode) {
- RpcSerde ser{nullptr};
- auto buf = ser.Header("elliott");
- auto header_buf = buf->next();
- ConnectionHeader h;
-
- EXPECT_TRUE(h.ParseFromArray(header_buf->data(), header_buf->length()));
- EXPECT_EQ("elliott", h.user_info().effective_user());
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/codec.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/codec.h b/hbase-native-client/serde/codec.h
deleted file mode 100644
index 64807dc..0000000
--- a/hbase-native-client/serde/codec.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <folly/io/IOBuf.h>
-#include <memory>
-
-#include "serde/cell-outputstream.h"
-#include "serde/cell-scanner.h"
-
-namespace hbase {
-
-/**
- * @brief Encoder / Decoder for Cells.
- */
-class Codec {
- public:
- virtual ~Codec() {}
-
- class Encoder : public CellOutputStream {};
-
- class Decoder : public CellScanner {};
-
- virtual std::unique_ptr<Encoder> CreateEncoder() = 0;
- virtual std::unique_ptr<Decoder> CreateDecoder(std::unique_ptr<folly::IOBuf> cell_block,
- uint32_t cell_block_start_offset,
- uint32_t cell_block_length) = 0;
-
- /** @brief returns the java class name corresponding to this Codec implementation */
- virtual const char* java_class_name() const = 0;
-};
-
-} /* namespace hbase */
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/region-info-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/region-info-deserializer-test.cc b/hbase-native-client/serde/region-info-deserializer-test.cc
deleted file mode 100644
index 5cb8482..0000000
--- a/hbase-native-client/serde/region-info-deserializer-test.cc
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "serde/region-info.h"
-
-#include <gtest/gtest.h>
-
-#include <string>
-
-#include "if/HBase.pb.h"
-#include "serde/table-name.h"
-
-using std::string;
-using hbase::pb::RegionInfo;
-using hbase::pb::TableName;
-
-TEST(TestRegionInfoDesializer, TestDeserialize) {
- string ns{"test_ns"};
- string tn{"table_name"};
- string start_row{"AAAAAA"};
- string stop_row{"BBBBBBBBBBBB"};
- uint64_t region_id = 2345678;
-
- RegionInfo ri_out;
- ri_out.set_region_id(region_id);
- ri_out.mutable_table_name()->set_namespace_(ns);
- ri_out.mutable_table_name()->set_qualifier(tn);
- ri_out.set_start_key(start_row);
- ri_out.set_end_key(stop_row);
-
- string header{"PBUF"};
- string ser = header + ri_out.SerializeAsString();
-
- auto out = folly::to<RegionInfo>(ser);
-
- EXPECT_EQ(region_id, out.region_id());
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/region-info.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h
deleted file mode 100644
index 8010042..0000000
--- a/hbase-native-client/serde/region-info.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <folly/Conv.h>
-#include <boost/algorithm/string/predicate.hpp>
-
-#include <string>
-
-#include "if/HBase.pb.h"
-
-namespace hbase {
-namespace pb {
-template <class String>
-void parseTo(String in, RegionInfo &out) {
- // TODO(eclark): there has to be something better.
- std::string s = folly::to<std::string>(in);
-
- if (!boost::starts_with(s, "PBUF")) {
- throw std::runtime_error("Region Info field doesn't contain preamble");
- }
- if (!out.ParseFromArray(s.data() + 4, s.size() - 4)) {
- throw std::runtime_error("Bad protobuf for RegionInfo");
- }
-}
-} // namespace pb
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/rpc-serde.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.cc b/hbase-native-client/serde/rpc-serde.cc
deleted file mode 100644
index 70a57e8..0000000
--- a/hbase-native-client/serde/rpc-serde.cc
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <folly/Conv.h>
-#include <folly/Logging.h>
-#include <folly/io/Cursor.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/message.h>
-#include <boost/algorithm/string.hpp>
-
-#include <utility>
-
-#include "if/RPC.pb.h"
-#include "rpc-serde.h"
-#include "utils/version.h"
-
-using folly::IOBuf;
-using folly::io::RWPrivateCursor;
-using google::protobuf::Message;
-using google::protobuf::io::ArrayInputStream;
-using google::protobuf::io::ArrayOutputStream;
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::CodedOutputStream;
-using google::protobuf::io::ZeroCopyOutputStream;
-
-using namespace hbase::pb;
-
-namespace hbase {
-
-static const std::string PREAMBLE = "HBas";
-static const std::string INTERFACE = "ClientService";
-static const uint8_t RPC_VERSION = 0;
-static const uint8_t DEFAULT_AUTH_TYPE = 80;
-static const uint8_t KERBEROS_AUTH_TYPE = 81;
-
-int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
- if (buf == nullptr || msg == nullptr) {
- return -2;
- }
-
- DCHECK(!buf->isChained());
-
- ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())};
- CodedInputStream coded_stream{&ais};
-
- uint32_t msg_size;
-
- // Try and read the varint.
- if (coded_stream.ReadVarint32(&msg_size) == false) {
- FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t";
- return -3;
- }
-
- coded_stream.PushLimit(msg_size);
- // Parse the message.
- if (msg->MergeFromCodedStream(&coded_stream) == false) {
- FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a protobuf message from data.";
- return -4;
- }
-
- // Make sure all the data was consumed.
- if (coded_stream.ConsumedEntireMessage() == false) {
- FB_LOG_EVERY_MS(ERROR, 1000) << "Orphaned data left after reading protobuf message";
- return -5;
- }
-
- return coded_stream.CurrentPosition();
-}
-
-RpcSerde::RpcSerde() {}
-
-RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : codec_(codec) {}
-
-std::unique_ptr<IOBuf> RpcSerde::Preamble(bool secure) {
- auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
- magic->append(2);
- RWPrivateCursor c(magic.get());
- c.skip(4);
- // Version
- c.write(RPC_VERSION);
- if (secure) {
- // for now support only KERBEROS (DIGEST is not supported)
- c.write(KERBEROS_AUTH_TYPE);
- } else {
- c.write(DEFAULT_AUTH_TYPE);
- }
- return magic;
-}
-
-std::unique_ptr<IOBuf> RpcSerde::Header(const std::string &user) {
- pb::ConnectionHeader h;
-
- // TODO(eclark): Make this not a total lie.
- h.mutable_user_info()->set_effective_user(user);
- // The service name that we want to talk to.
- //
- // Right now we're completely ignoring the service interface.
- // That may or may not be the correct thing to do.
- // It worked for a while with the java client; until it
- // didn't.
- // TODO: send the service name and user from the RpcClient
- h.set_service_name(INTERFACE);
-
- std::unique_ptr<pb::VersionInfo> version_info = CreateVersionInfo();
-
- h.set_allocated_version_info(version_info.release());
-
- if (codec_ != nullptr) {
- h.set_cell_block_codec_class(codec_->java_class_name());
- }
- return PrependLength(SerializeMessage(h));
-}
-
-std::unique_ptr<pb::VersionInfo> RpcSerde::CreateVersionInfo() {
- std::unique_ptr<pb::VersionInfo> version_info = std::make_unique<pb::VersionInfo>();
- version_info->set_user(Version::user);
- version_info->set_revision(Version::revision);
- version_info->set_url(Version::url);
- version_info->set_date(Version::date);
- version_info->set_src_checksum(Version::src_checksum);
- version_info->set_version(Version::version);
-
- std::string version{Version::version};
- std::vector<std::string> version_parts;
- boost::split(version_parts, version, boost::is_any_of("."), boost::token_compress_on);
- uint32_t major_version = 0, minor_version = 0;
- if (version_parts.size() >= 2) {
- version_info->set_version_major(folly::to<uint32_t>(version_parts[0]));
- version_info->set_version_minor(folly::to<uint32_t>(version_parts[1]));
- }
-
- VLOG(1) << "Client VersionInfo:" << version_info->ShortDebugString();
- return version_info;
-}
-
-std::unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const std::string &method,
- const Message *msg) {
- pb::RequestHeader rq;
- rq.set_method_name(method);
- rq.set_call_id(call_id);
- rq.set_request_param(msg != nullptr);
- auto ser_header = SerializeDelimited(rq);
- if (msg != nullptr) {
- auto ser_req = SerializeDelimited(*msg);
- ser_header->appendChain(std::move(ser_req));
- }
-
- return PrependLength(std::move(ser_header));
-}
-
-std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
- const google::protobuf::Message *msg) {
- pb::ResponseHeader rh;
- rh.set_call_id(call_id);
- auto ser_header = SerializeDelimited(rh);
- auto ser_resp = SerializeDelimited(*msg);
- ser_header->appendChain(std::move(ser_resp));
-
- return PrependLength(std::move(ser_header));
-}
-
-std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
- const google::protobuf::Message *msg,
- const folly::exception_wrapper &exception) {
- /* create ResponseHeader */
- pb::ResponseHeader rh;
- rh.set_call_id(call_id);
-
- /* create ExceptionResponse */
- if (bool(exception)) {
- VLOG(1) << "packing ExceptionResponse";
- auto exception_response = new pb::ExceptionResponse();
- exception_response->set_exception_class_name(exception.class_name().c_str());
- exception_response->set_stack_trace(exception.what().c_str());
- rh.set_allocated_exception(exception_response);
- }
-
- /* serialize Response header and body */
- auto ser_header = SerializeDelimited(rh);
- auto ser_resp = SerializeDelimited(*msg);
- ser_header->appendChain(std::move(ser_resp));
-
- VLOG(3) << "Converted hbase::Response to folly::IOBuf";
- return PrependLength(std::move(ser_header));
-}
-
-std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
- uint32_t offset, uint32_t length) {
- if (codec_ == nullptr) {
- return nullptr;
- }
- return codec_->CreateDecoder(std::move(buf), offset, length);
-}
-
-std::unique_ptr<IOBuf> RpcSerde::PrependLength(std::unique_ptr<IOBuf> msg) {
- // Java ints are 4 long. So create a buffer that large
- auto len_buf = IOBuf::create(4);
- // Then make those bytes visible.
- len_buf->append(4);
-
- RWPrivateCursor c(len_buf.get());
- // Get the size of the data to be pushed out the network.
- auto size = msg->computeChainDataLength();
-
- // Write the length to this IOBuf.
- c.writeBE(static_cast<uint32_t>(size));
-
- // Then attach the origional to the back of len_buf
- len_buf->appendChain(std::move(msg));
- return len_buf;
-}
-
-std::unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) {
- // Get the buffer size needed for just the message.
- int msg_size = msg.ByteSize();
- int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size;
-
- // Create a buffer big enough to hold the varint and the object.
- auto buf = IOBuf::create(buf_size);
- buf->append(buf_size);
-
- // Create the array output stream.
- ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())};
- // Wrap the ArrayOuputStream in the coded output stream to allow writing
- // Varint32
- CodedOutputStream cos{&aos};
-
- // Write out the size.
- cos.WriteVarint32(msg_size);
-
- // Now write the rest out.
- // We're using the protobuf output streams here to keep track
- // of where in the output array we are rather than IOBuf.
- msg.SerializeWithCachedSizesToArray(cos.GetDirectBufferForNBytesAndAdvance(msg_size));
-
- // Return the buffer.
- return buf;
-}
-// TODO(eclark): Make this 1 copy.
-std::unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) {
- auto buf = IOBuf::copyBuffer(msg.SerializeAsString());
- return buf;
-}
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/rpc-serde.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.h b/hbase-native-client/serde/rpc-serde.h
deleted file mode 100644
index 6941f62..0000000
--- a/hbase-native-client/serde/rpc-serde.h
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <memory>
-#include <string>
-
-#include <folly/ExceptionWrapper.h>
-#include "if/HBase.pb.h"
-#include "serde/cell-scanner.h"
-#include "serde/codec.h"
-
-using namespace folly;
-// Forward
-namespace folly {
-class IOBuf;
-}
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-
-namespace hbase {
-
-/**
- * @brief Class for serializing a deserializing rpc formatted data.
- *
- * RpcSerde is the one stop shop for reading/writing data to HBase daemons.
- * It should throw exceptions if anything goes wrong.
- */
-class RpcSerde {
- public:
- RpcSerde();
- /**
- * Constructor assumes the default auth type.
- */
- RpcSerde(std::shared_ptr<Codec> codec);
-
- /**
- * Destructor. This is provided just for testing purposes.
- */
- virtual ~RpcSerde() = default;
-
- /**
- * Pase a message in the delimited format.
- *
- * A message in delimited format consists of the following:
- *
- * - a protobuf var int32.
- * - A protobuf object serialized.
- */
- int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg);
-
- /**
- * Create a new connection preamble in a new IOBuf.
- */
- static std::unique_ptr<folly::IOBuf> Preamble(bool secure);
-
- /**
- * Create the header protobuf object and serialize it to a new IOBuf.
- * Header is in the following format:
- *
- * - Big endian length
- * - ConnectionHeader object serialized out.
- */
- std::unique_ptr<folly::IOBuf> Header(const std::string &user);
-
- /**
- * Take ownership of the passed buffer, and create a CellScanner using the
- * Codec class to parse Cells out of the wire.
- */
- std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset,
- uint32_t length);
-
- /**
- * Serialize a request message into a protobuf.
- * Request consists of:
- *
- * - Big endian length
- * - RequestHeader object
- * - The passed in Message object
- */
- std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id, const std::string &method,
- const google::protobuf::Message *msg);
-
- /**
- * Serialize a response message into a protobuf.
- * Request consists of:
- *
- * - Big endian length
- * - ResponseHeader object
- * - The passed in Message object
- */
- std::unique_ptr<folly::IOBuf> Response(const uint32_t call_id,
- const google::protobuf::Message *msg);
-
- /**
- * Serialize a response message into a protobuf.
- * Request consists of:
- *
- * - Big endian length
- * - ResponseHeader object
- * - The passed in hbase::Response object
- */
- std::unique_ptr<folly::IOBuf> Response(const uint32_t call_id,
- const google::protobuf::Message *msg,
- const folly::exception_wrapper &exception);
-
- /**
- * Serialize a message in the delimited format.
- * Delimited format consists of the following:
- *
- * - A protobuf var int32
- * - The message object seriailized after that.
- */
- std::unique_ptr<folly::IOBuf> SerializeDelimited(const google::protobuf::Message &msg);
-
- /**
- * Serilalize a message. This does not add any length prepend.
- */
- std::unique_ptr<folly::IOBuf> SerializeMessage(const google::protobuf::Message &msg);
-
- /**
- * Prepend a length IOBuf to the given IOBuf chain.
- * This involves no copies or moves of the passed in data.
- */
- std::unique_ptr<folly::IOBuf> PrependLength(std::unique_ptr<folly::IOBuf> msg);
-
- public:
- static constexpr const char *HBASE_CLIENT_RPC_TEST_MODE = "hbase.client.rpc.test.mode";
- static constexpr const bool DEFAULT_HBASE_CLIENT_RPC_TEST_MODE = false;
-
- private:
- /* data */
- std::shared_ptr<Codec> codec_;
- std::unique_ptr<pb::VersionInfo> CreateVersionInfo();
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/server-name-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/server-name-test.cc b/hbase-native-client/serde/server-name-test.cc
deleted file mode 100644
index 87c493a..0000000
--- a/hbase-native-client/serde/server-name-test.cc
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "serde/server-name.h"
-
-#include <gtest/gtest.h>
-#include <string>
-
-using hbase::pb::ServerName;
-
-TEST(TestServerName, TestMakeServerName) {
- auto sn = folly::to<ServerName>("test:123");
-
- ASSERT_EQ("test", sn.host_name());
- ASSERT_EQ(123, sn.port());
-}
-
-TEST(TestServerName, TestIps) {
- auto sn = folly::to<ServerName>("127.0.0.1:999");
- ASSERT_EQ("127.0.0.1", sn.host_name());
- ASSERT_EQ(999, sn.port());
-}
-
-TEST(TestServerName, TestThrow) { ASSERT_ANY_THROW(folly::to<ServerName>("Ther's no colon here")); }
-
-TEST(TestServerName, TestIPV6) {
- auto sn = folly::to<ServerName>("[::::1]:123");
-
- ASSERT_EQ("[::::1]", sn.host_name());
- ASSERT_EQ(123, sn.port());
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/server-name.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/server-name.h b/hbase-native-client/serde/server-name.h
deleted file mode 100644
index 41e3c77..0000000
--- a/hbase-native-client/serde/server-name.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#pragma once
-
-#include <folly/Conv.h>
-#include <folly/String.h>
-
-#include <string>
-
-#include "if/HBase.pb.h"
-
-namespace hbase {
-namespace pb {
-
-template <class String>
-void parseTo(String in, ServerName &out) {
- // TODO see about getting rsplit into folly.
- std::string s = folly::to<std::string>(in);
-
- auto delim = s.rfind(":");
- if (delim == std::string::npos) {
- throw std::runtime_error("Couldn't parse server name");
- }
- out.set_host_name(s.substr(0, delim));
- // Now keep everything after the : (delim + 1) to the end.
- out.set_port(folly::to<int>(s.substr(delim + 1)));
-}
-
-} // namespace pb
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/table-name-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/table-name-test.cc b/hbase-native-client/serde/table-name-test.cc
deleted file mode 100644
index 877d522..0000000
--- a/hbase-native-client/serde/table-name-test.cc
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <folly/Conv.h>
-#include <gtest/gtest.h>
-
-#include <string>
-
-#include "serde/table-name.h"
-
-using namespace hbase;
-using hbase::pb::TableName;
-
-TEST(TestTableName, TestToStringNoDefault) {
- TableName tn;
- tn.set_qualifier("TestTableName");
- std::string result = folly::to<std::string>(tn);
- ASSERT_EQ(result.find("default"), std::string::npos);
- ASSERT_EQ("TestTableName", result);
-}
-
-TEST(TestTableName, TestToStringNoDefaltWhenSet) {
- TableName tn;
- tn.set_namespace_("default");
- tn.set_qualifier("TestTableName");
- std::string result = folly::to<std::string>(tn);
- ASSERT_EQ(result.find("default"), std::string::npos);
- ASSERT_EQ("TestTableName", result);
-}
-
-TEST(TestTableName, TestToStringIncludeNS) {
- TableName tn;
- tn.set_namespace_("hbase");
- tn.set_qualifier("acl");
- std::string result = folly::to<std::string>(tn);
- ASSERT_EQ(result.find("hbase"), 0);
- ASSERT_EQ("hbase:acl", result);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/table-name.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/table-name.h b/hbase-native-client/serde/table-name.h
deleted file mode 100644
index 3594802..0000000
--- a/hbase-native-client/serde/table-name.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <folly/Conv.h>
-#include <folly/String.h>
-
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "if/HBase.pb.h"
-
-namespace hbase {
-namespace pb {
-
-// Provide folly::to<std::string>(TableName);
-template <class String>
-void toAppend(const TableName &in, String *result) {
- if (!in.has_namespace_() || in.namespace_() == "default") {
- folly::toAppend(in.qualifier(), result);
- } else {
- folly::toAppend(in.namespace_(), ':', in.qualifier(), result);
- }
-}
-
-template <class String>
-void parseTo(String in, TableName &out) {
- std::vector<std::string> v;
- folly::split(":", in, v);
-
- if (v.size() == 1) {
- out.set_namespace_("default");
- out.set_qualifier(v[0]);
- } else {
- out.set_namespace_(v[0]);
- out.set_qualifier(v[1]);
- }
-}
-
-} // namespace pb
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/zk-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/zk-deserializer-test.cc b/hbase-native-client/serde/zk-deserializer-test.cc
deleted file mode 100644
index f07eecf..0000000
--- a/hbase-native-client/serde/zk-deserializer-test.cc
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "serde/zk.h"
-
-#include <folly/Logging.h>
-#include <folly/io/Cursor.h>
-#include <folly/io/IOBuf.h>
-#include <gtest/gtest.h>
-
-#include "if/ZooKeeper.pb.h"
-
-using namespace hbase;
-using namespace hbase::pb;
-using namespace folly;
-using namespace std;
-using namespace folly::io;
-
-// Test that would test if there's nothing there.
-TEST(TestZkDesializer, TestThrowNoMagicNum) {
- ZkDeserializer deser;
- MetaRegionServer mrs;
-
- auto buf = IOBuf::create(100);
- buf->append(100);
- RWPrivateCursor c{buf.get()};
- c.write<uint8_t>(99);
- ASSERT_THROW(deser.Parse(buf.get(), &mrs), runtime_error);
-}
-
-// Test if the protobuf is in a format that we can't decode
-TEST(TestZkDesializer, TestBadProtoThrow) {
- ZkDeserializer deser;
- MetaRegionServer mrs;
- string magic{"PBUF"};
-
- // Set ServerName
- mrs.mutable_server()->set_host_name("test");
- mrs.mutable_server()->set_port(567);
- mrs.mutable_server()->set_start_code(9567);
-
- // One byte magic number
- // four bytes for id length
- // four bytes for id
- // four bytes for PBUF
- uint32_t start_len = 1 + 4 + 4 + 4;
- // How large the protobuf will be
- uint32_t pbuf_size = mrs.ByteSize();
-
- auto buf = IOBuf::create(start_len + pbuf_size);
- buf->append(start_len + pbuf_size);
- RWPrivateCursor c{buf.get()};
-
- // Write the magic number
- c.write<uint8_t>(255);
- // Write the id len
- c.writeBE<uint32_t>(4);
- // Write the id
- c.write<uint32_t>(13);
- // Write the PBUF string
- c.push(reinterpret_cast<const uint8_t *>(magic.c_str()), 4);
-
- // Create the protobuf
- MetaRegionServer out;
- ASSERT_THROW(deser.Parse(buf.get(), &out), runtime_error);
-}
-
-// Test to make sure the whole thing works.
-TEST(TestZkDesializer, TestNoThrow) {
- ZkDeserializer deser;
- MetaRegionServer mrs;
- string magic{"PBUF"};
-
- // Set ServerName
- mrs.mutable_server()->set_host_name("test");
- mrs.mutable_server()->set_port(567);
- mrs.mutable_server()->set_start_code(9567);
-
- // One byte magic number
- // four bytes for id length
- // four bytes for id
- // four bytes for PBUF
- uint32_t start_len = 1 + 4 + 4 + 4;
- // How large the protobuf will be
- uint32_t pbuf_size = mrs.ByteSize();
-
- auto buf = IOBuf::create(start_len + pbuf_size);
- buf->append(start_len + pbuf_size);
- RWPrivateCursor c{buf.get()};
-
- // Write the magic number
- c.write<uint8_t>(255);
- // Write the id len
- c.writeBE<uint32_t>(4);
- // Write the id
- c.write<uint32_t>(13);
- // Write the PBUF string
- c.push(reinterpret_cast<const uint8_t *>(magic.c_str()), 4);
-
- // Now write the serialized protobuf
- mrs.SerializeWithCachedSizesToArray(buf->writableData() + start_len);
-
- // Create the protobuf
- MetaRegionServer out;
- ASSERT_TRUE(deser.Parse(buf.get(), &out));
- ASSERT_EQ(mrs.server().host_name(), out.server().host_name());
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/zk.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/zk.cc b/hbase-native-client/serde/zk.cc
deleted file mode 100644
index a71eb87..0000000
--- a/hbase-native-client/serde/zk.cc
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "serde/zk.h"
-
-#include <folly/io/Cursor.h>
-#include <folly/io/IOBuf.h>
-#include <google/protobuf/message.h>
-
-#include <string>
-
-using std::runtime_error;
-
-namespace hbase {
-
-static const std::string MAGIC_STRING = "PBUF";
-
-bool ZkDeserializer::Parse(folly::IOBuf *buf, google::protobuf::Message *out) {
- // The format is like this
- // 1 byte of magic number. 255
- // 4 bytes of id length.
- // id_length number of bytes for the id of who put up the znode
- // 4 bytes of a magic string PBUF
- // Then the protobuf serialized without a varint header.
-
- folly::io::Cursor c{buf};
-
- // There should be a magic number for recoverable zk
- uint8_t magic_num = c.read<uint8_t>();
- if (magic_num != 255) {
- LOG(ERROR) << "Magic number not in ZK znode data expected 255 got =" << unsigned(magic_num);
- throw runtime_error("Magic number not in znode data");
- }
- // How long is the id?
- uint32_t id_len = c.readBE<uint32_t>();
-
- if (id_len >= c.length()) {
- LOG(ERROR) << "After skiping the if from zookeeper data there's not enough "
- "left to read anything else";
- throw runtime_error("Not enough bytes to decode from zookeeper");
- }
-
- // Skip the id
- c.skip(id_len);
-
- // Make sure that the magic string is there.
- if (MAGIC_STRING != c.readFixedString(4)) {
- LOG(ERROR) << "There was no PBUF magic string.";
- throw runtime_error("No PBUF magic string in the zookpeeper data.");
- }
-
- // Try to decode the protobuf.
- // If there's an error bail out.
- if (out->ParseFromArray(c.data(), c.length()) == false) {
- LOG(ERROR) << "Error parsing Protobuf Message";
- throw runtime_error("Error parsing protobuf");
- }
-
- return true;
-}
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/serde/zk.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/zk.h b/hbase-native-client/serde/zk.h
deleted file mode 100644
index 5cadec2..0000000
--- a/hbase-native-client/serde/zk.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-namespace folly {
-class IOBuf;
-}
-
-namespace hbase {
-
-/** @brief A class to convert data from ZooKeeper to other formats.
- *
- * This class will convert data to and from Zookeeper into protobuf objects.
- *
- */
-class ZkDeserializer {
- public:
- /**
- * Merge the data from a buffer into a given message.
- *
- * @param buf Naked pointer to iobuf containing data read from zookeeper.
- * @param out Naked pointer into which the data will be merged. The message
- * should be the correct type.
- * @return returns true if the parsing was successful.
- */
- bool Parse(folly::IOBuf *buf, google::protobuf::Message *out);
-};
-} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/BUCK b/hbase-native-client/src/hbase/client/BUCK
new file mode 100644
index 0000000..1a8f434
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/BUCK
@@ -0,0 +1,301 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is the main library.
+cxx_library(
+ name="client",
+ srcs=[
+ "async-client-scanner.cc",
+ "async-connection.cc",
+ "async-rpc-retrying-caller-factory.cc",
+ "async-rpc-retrying-caller.cc",
+ "async-scan-rpc-retrying-caller.cc",
+ "async-table-result-scanner.cc",
+ "cell.cc",
+ "client.cc",
+ "hbase-rpc-controller.cc",
+ "keyvalue-codec.cc",
+ "location-cache.cc",
+ "meta-utils.cc",
+ "increment.cc",
+ "get.cc",
+ "mutation.cc",
+ "put.cc",
+ "delete.cc",
+ "scan.cc",
+ "append.cc",
+ "scan-result-cache.cc",
+ "raw-async-table.cc",
+ "result.cc",
+ "request-converter.cc",
+ "response-converter.cc",
+ "table.cc",
+ "time-range.cc",
+ "zk-util.cc",
+ "multi-response.cc",
+ "region-result.cc",
+ "async-batch-rpc-retrying-caller.cc",
+ ],
+ deps=[
+ "//include/hbase/client:client",
+ "//src/hbase/exceptions:exceptions",
+ "//src/hbase/utils:utils",
+ "//src/hbase/connection:connection",
+ "//src/hbase/client:conf",
+ "//src/hbase/if:if",
+ "//src/hbase/serde:serde",
+ "//third-party:folly",
+ "//third-party:wangle",
+ "//third-party:zookeeper_mt",
+ ],
+ compiler_flags=['-Weffc++', '-ggdb'],
+ visibility=[
+ 'PUBLIC',
+ ],)
+cxx_library(
+ name="conf",
+ exported_headers=[
+ ],
+ srcs=[
+ "configuration.cc",
+ "hbase-configuration-loader.cc",
+ ],
+ deps=["//include/hbase/client:conf", "//src/hbase/utils:utils", "//third-party:folly"],
+ compiler_flags=['-Weffc++', '-ggdb'],
+ visibility=[
+ 'PUBLIC',
+ ],)
+cxx_test(
+ name="location-cache-test",
+ srcs=[
+ "location-cache-test.cc",
+ ],
+ deps=[
+ ":client",
+ "//src/hbase/test-util:test-util",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="location-cache-retry-test",
+ srcs=[
+ "location-cache-retry-test.cc",
+ ],
+ deps=[
+ ":client",
+ "//src/hbase/if:if",
+ "//src/hbase/serde:serde",
+ "//src/hbase/test-util:test-util",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="cell-test",
+ srcs=[
+ "cell-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="filter-test",
+ srcs=[
+ "filter-test.cc",
+ ],
+ deps=[
+ ":client",
+ "//src/hbase/if:if",
+ "//src/hbase/serde:serde",
+ "//src/hbase/test-util:test-util",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="get-test",
+ srcs=[
+ "get-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="delete-test",
+ srcs=[
+ "delete-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="increment-test",
+ srcs=[
+ "increment-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="put-test",
+ srcs=[
+ "put-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="append-test",
+ srcs=[
+ "append-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="retry-test",
+ srcs=[
+ "async-rpc-retrying-test.cc",
+ ],
+ deps=[
+ ":client",
+ "//src/hbase/test-util:test-util",
+ "//src/hbase/exceptions:exceptions",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="time-range-test",
+ srcs=[
+ "time-range-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="configuration-test",
+ srcs=[
+ "configuration-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="hbase-configuration-test",
+ srcs=[
+ "hbase-configuration-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="scan-test",
+ srcs=[
+ "scan-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="result-test",
+ srcs=[
+ "result-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="request-converter-test",
+ srcs=[
+ "request-converter-test.cc",
+ ],
+ deps=[
+ ":client",
+ "//src/hbase/connection:connection",
+ "//src/hbase/if:if",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="client-test",
+ srcs=[
+ "client-test.cc",
+ ],
+ deps=[
+ ":client",
+ "//src/hbase/if:if",
+ "//src/hbase/serde:serde",
+ "//src/hbase/test-util:test-util",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="scan-result-cache-test",
+ srcs=[
+ "scan-result-cache-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="scanner-test",
+ srcs=[
+ "scanner-test.cc",
+ ],
+ deps=[
+ ":client",
+ "//src/hbase/if:if",
+ "//src/hbase/serde:serde",
+ "//src/hbase/test-util:test-util",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="zk-util-test",
+ srcs=[
+ "zk-util-test.cc",
+ ],
+ deps=[
+ ":client",
+ ],
+ run_test_separately=True,)
+cxx_test(
+ name="multi-retry-test",
+ srcs=[
+ "async-batch-rpc-retrying-test.cc",
+ ],
+ deps=[
+ ":client",
+ "//src/hbase/test-util:test-util",
+ "//src/hbase/exceptions:exceptions",
+ ],
+ run_test_separately=True,)
+cxx_binary(
+ name="simple-client",
+ srcs=[
+ "simple-client.cc",
+ ],
+ deps=[":client", "//src/hbase/connection:connection"],)
+cxx_binary(
+ name="load-client",
+ srcs=[
+ "load-client.cc",
+ ],
+ deps=[":client", "//src/hbase/connection:connection"],)
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/append-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/append-test.cc b/hbase-native-client/src/hbase/client/append-test.cc
new file mode 100644
index 0000000..1af138d
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/append-test.cc
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "hbase/client/append.h"
+#include "hbase/client/mutation.h"
+#include "hbase/utils/time-util.h"
+
+using hbase::Append;
+using hbase::Cell;
+using hbase::CellType;
+using hbase::Mutation;
+using hbase::TimeUtil;
+
+const constexpr int64_t Mutation::kLatestTimestamp;
+
+TEST(Append, Row) {
+ Append append{"foo"};
+ EXPECT_EQ("foo", append.row());
+}
+
+TEST(Append, Durability) {
+ Append append{"row"};
+ EXPECT_EQ(hbase::pb::MutationProto_Durability_USE_DEFAULT, append.Durability());
+
+ auto skipWal = hbase::pb::MutationProto_Durability_SKIP_WAL;
+ append.SetDurability(skipWal);
+ EXPECT_EQ(skipWal, append.Durability());
+}
+
+TEST(Append, Timestamp) {
+ Append append{"row"};
+
+ // test default timestamp
+ EXPECT_EQ(Mutation::kLatestTimestamp, append.TimeStamp());
+
+ // set custom timestamp
+ auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos());
+ append.SetTimeStamp(ts);
+ EXPECT_EQ(ts, append.TimeStamp());
+
+ // Add a column with custom timestamp
+ append.Add("f", "q", "v");
+ auto &cell = append.FamilyMap().at("f")[0];
+ EXPECT_EQ(ts, cell->Timestamp());
+}
+
+TEST(Append, HasFamilies) {
+ Append append{"row"};
+
+ EXPECT_EQ(false, append.HasFamilies());
+
+ append.Add("f", "q", "v");
+ EXPECT_EQ(true, append.HasFamilies());
+}
+
+TEST(Append, Add) {
+ CellType cell_type = CellType::PUT;
+ std::string row = "row";
+ std::string family = "family";
+ std::string column = "column";
+ std::string value = "value";
+ int64_t timestamp = std::numeric_limits<int64_t>::max();
+ auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
+
+ // add first cell
+ Append append{"row"};
+ append.Add(std::move(cell));
+ EXPECT_EQ(1, append.FamilyMap().size());
+ EXPECT_EQ(1, append.FamilyMap().at(family).size());
+
+ // add a non-matching row
+ auto cell2 = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
+ Append append2{"foo"};
+ ASSERT_THROW(append2.Add(std::move(cell2)), std::runtime_error); // rows don't match
+
+ // add a second cell with same family
+ auto cell3 = std::make_unique<Cell>(row, family, "column-2", timestamp, value, cell_type);
+ append.Add(std::move(cell3));
+ EXPECT_EQ(1, append.FamilyMap().size());
+ EXPECT_EQ(2, append.FamilyMap().at(family).size());
+
+ // add a cell to a different family
+ auto cell4 = std::make_unique<Cell>(row, "family-2", "column-2", timestamp, value, cell_type);
+ append.Add(std::move(cell4));
+ EXPECT_EQ(2, append.FamilyMap().size());
+ EXPECT_EQ(1, append.FamilyMap().at("family-2").size());
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/append.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/append.cc b/hbase-native-client/src/hbase/client/append.cc
new file mode 100644
index 0000000..431a398
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/append.cc
@@ -0,0 +1,53 @@
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/append.h"
+#include <folly/Conv.h>
+#include <algorithm>
+#include <limits>
+#include <stdexcept>
+#include <utility>
+
+namespace hbase {
+
+/**
+ * @brief Append to the column from the specific family with the specified qualifier
+ * @param family family name
+ * @param qualifier column qualifier
+ * @param value value to append
+ */
+Append& Append::Add(const std::string& family, const std::string& qualifier,
+ const std::string& value) {
+ family_map_[family].push_back(std::move(
+ std::make_unique<Cell>(row_, family, qualifier, timestamp_, value, hbase::CellType::PUT)));
+ return *this;
+}
+Append& Append::Add(std::unique_ptr<Cell> cell) {
+ if (cell->Row() != row_) {
+ throw std::runtime_error("The row in " + cell->DebugString() +
+ " doesn't match the original one " + row_);
+ }
+
+ family_map_[cell->Family()].push_back(std::move(cell));
+ return *this;
+}
+
+} // namespace hbase
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-caller.cc b/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-caller.cc
new file mode 100644
index 0000000..6699b90
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/async-batch-rpc-retrying-caller.cc
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/async-batch-rpc-retrying-caller.h"
+#include <glog/logging.h>
+#include <limits>
+
+using folly::Future;
+using folly::Promise;
+using folly::Try;
+using hbase::pb::ServerName;
+using hbase::pb::TableName;
+using hbase::security::User;
+using std::chrono::nanoseconds;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+template <typename REQ, typename RESP>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::AsyncBatchRpcRetryingCaller(
+ std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer,
+ std::shared_ptr<TableName> table_name, const std::vector<REQ> &actions, nanoseconds pause_ns,
+ int32_t max_attempts, nanoseconds operation_timeout_ns, nanoseconds rpc_timeout_ns,
+ int32_t start_log_errors_count)
+ : conn_(conn),
+ retry_timer_(retry_timer),
+ table_name_(table_name),
+ pause_ns_(pause_ns),
+ operation_timeout_ns_(operation_timeout_ns),
+ rpc_timeout_ns_(rpc_timeout_ns),
+ start_log_errors_count_(start_log_errors_count) {
+ CHECK(conn_ != nullptr);
+ CHECK(retry_timer_ != nullptr);
+ location_cache_ = conn_->region_locator();
+ rpc_client_ = conn_->rpc_client();
+ cpu_pool_ = conn_->cpu_executor();
+ CHECK(location_cache_ != nullptr);
+ CHECK(rpc_client_ != nullptr);
+ CHECK(cpu_pool_ != nullptr);
+
+ max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts);
+ uint32_t index = 0;
+ for (auto row : actions) {
+ actions_.push_back(std::make_shared<Action>(row, index));
+ Promise<RESP> prom{};
+ action2promises_.insert(std::pair<uint64_t, Promise<RESP>>(index, std::move(prom)));
+ action2futures_.push_back(action2promises_[index++].getFuture());
+ }
+}
+
+template <typename REQ, typename RESP>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::~AsyncBatchRpcRetryingCaller() {}
+
+template <typename REQ, typename RESP>
+Future<std::vector<Try<RESP>>> AsyncBatchRpcRetryingCaller<REQ, RESP>::Call() {
+ GroupAndSend(actions_, 1);
+ return collectAll(action2futures_);
+}
+
+template <typename REQ, typename RESP>
+int64_t AsyncBatchRpcRetryingCaller<REQ, RESP>::RemainingTimeNs() {
+ return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_);
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
+ int32_t tries, std::shared_ptr<RegionRequest> region_request,
+ const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
+ if (tries > start_log_errors_count_) {
+ std::string regions;
+ regions += region_request->region_location()->region_name() + ", ";
+ LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
+ << table_name_->qualifier() << " from " << server_name->host_name()
+ << " failed, tries=" << tries << ":- " << ew.what().toStdString();
+ }
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException(
+ int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests,
+ const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
+ if (tries > start_log_errors_count_) {
+ std::string regions;
+ for (const auto region_request : region_requests) {
+ regions += region_request->region_location()->region_name() + ", ";
+ }
+ LOG(WARNING) << "Process batch for " << regions << " in " << table_name_->namespace_() << ":"
+ << table_name_->qualifier() << " from " << server_name->host_name()
+ << " failed, tries=" << tries << ew.what().toStdString();
+ }
+}
+
+template <typename REQ, typename RESP>
+const std::string AsyncBatchRpcRetryingCaller<REQ, RESP>::GetExtraContextForError(
+ std::shared_ptr<ServerName> server_name) {
+ return server_name ? server_name->ShortDebugString() : "";
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(const std::shared_ptr<Action> &action,
+ const folly::exception_wrapper &ew,
+ std::shared_ptr<ServerName> server_name) {
+ ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+ AddAction2Error(action->original_index(), twec);
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(
+ const std::vector<std::shared_ptr<Action>> &actions, const folly::exception_wrapper &ew,
+ std::shared_ptr<ServerName> server_name) {
+ for (const auto action : actions) {
+ AddError(action, ew, server_name);
+ }
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailOne(const std::shared_ptr<Action> &action,
+ int32_t tries,
+ const folly::exception_wrapper &ew,
+ int64_t current_time,
+ const std::string extras) {
+ auto action_index = action->original_index();
+ auto itr = action2promises_.find(action_index);
+ if (itr != action2promises_.end()) {
+ if (itr->second.isFulfilled()) {
+ return;
+ }
+ }
+ ThrowableWithExtraContext twec(ew, current_time, extras);
+ AddAction2Error(action_index, twec);
+ action2promises_[action_index].setException(
+ RetriesExhaustedException(tries - 1, action2errors_[action_index]));
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
+ const std::vector<std::shared_ptr<Action>> &actions, int32_t tries,
+ const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) {
+ for (const auto action : actions) {
+ FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+ }
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll(
+ const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
+ for (const auto action : actions) {
+ auto action_index = action->original_index();
+ auto itr = action2promises_.find(action_index);
+ if (itr->second.isFulfilled()) {
+ return;
+ }
+ action2promises_[action_index].setException(
+ RetriesExhaustedException(tries, action2errors_[action_index]));
+ }
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddAction2Error(
+ uint64_t action_index, const ThrowableWithExtraContext &twec) {
+ auto erritr = action2errors_.find(action_index);
+ if (erritr != action2errors_.end()) {
+ erritr->second->push_back(twec);
+ } else {
+ action2errors_[action_index] = std::make_shared<std::vector<ThrowableWithExtraContext>>();
+ action2errors_[action_index]->push_back(twec);
+ }
+ return;
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnError(const ActionsByRegion &actions_by_region,
+ int32_t tries,
+ const folly::exception_wrapper &ew,
+ std::shared_ptr<ServerName> server_name) {
+ std::vector<std::shared_ptr<Action>> copied_actions;
+ std::vector<std::shared_ptr<RegionRequest>> region_requests;
+ for (const auto &action_by_region : actions_by_region) {
+ region_requests.push_back(action_by_region.second);
+ for (const auto &action : action_by_region.second->actions()) {
+ copied_actions.push_back(action);
+ }
+ }
+
+ LogException(tries, region_requests, ew, server_name);
+ if ((tries >= max_attempts_) || !ExceptionUtil::ShouldRetry(ew)) {
+ FailAll(copied_actions, tries, ew, server_name);
+ return;
+ }
+ AddError(copied_actions, ew, server_name);
+ TryResubmit(copied_actions, tries);
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::TryResubmit(
+ const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
+ int64_t delay_ns;
+ if (operation_timeout_ns_.count() > 0) {
+ int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs;
+ if (max_delay_ns <= 0) {
+ FailAll(actions, tries);
+ return;
+ }
+ delay_ns = std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1));
+ } else {
+ delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries - 1);
+ }
+
+ conn_->retry_executor()->add([=]() {
+ retry_timer_->scheduleTimeoutFn(
+ [=]() { conn_->cpu_executor()->add([=]() { GroupAndSend(actions, tries + 1); }); },
+ milliseconds(TimeUtil::ToMillis(delay_ns)));
+ });
+}
+
+template <typename REQ, typename RESP>
+Future<std::vector<Try<std::shared_ptr<RegionLocation>>>>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::GetRegionLocations(
+ const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns) {
+ auto locs = std::vector<Future<std::shared_ptr<RegionLocation>>>{};
+ for (auto const &action : actions) {
+ locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(),
+ RegionLocateType::kCurrent, locate_timeout_ns));
+ }
+
+ return collectAll(locs);
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::GroupAndSend(
+ const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) {
+ int64_t locate_timeout_ns;
+ if (operation_timeout_ns_.count() > 0) {
+ locate_timeout_ns = RemainingTimeNs();
+ if (locate_timeout_ns <= 0) {
+ FailAll(actions, tries);
+ return;
+ }
+ } else {
+ locate_timeout_ns = -1L;
+ }
+
+ GetRegionLocations(actions, locate_timeout_ns)
+ .then([=](std::vector<Try<std::shared_ptr<RegionLocation>>> &loc) {
+ std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
+ ActionsByServer actions_by_server;
+ std::vector<std::shared_ptr<Action>> locate_failed;
+
+ for (uint64_t i = 0; i < loc.size(); ++i) {
+ auto action = actions[i];
+ if (loc[i].hasValue()) {
+ auto region_loc = loc[i].value();
+ // Add it to actions_by_server;
+ auto search =
+ actions_by_server.find(std::make_shared<ServerName>(region_loc->server_name()));
+ if (search != actions_by_server.end()) {
+ search->second->AddActionsByRegion(region_loc, action);
+ } else {
+ auto server_request = std::make_shared<ServerRequest>(region_loc);
+ server_request->AddActionsByRegion(region_loc, action);
+ auto server_name = std::make_shared<ServerName>(region_loc->server_name());
+ actions_by_server[server_name] = server_request;
+ }
+ VLOG(5) << "rowkey [" << action->action()->row() << "] of table["
+ << table_name_->ShortDebugString() << "] found in ["
+ << region_loc->region_name() << "]; RS["
+ << region_loc->server_name().host_name() << ":"
+ << region_loc->server_name().port() << "];";
+ } else if (loc[i].hasException()) {
+ folly::exception_wrapper ew = loc[i].exception();
+ VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
+ << "for index:" << i << "; tries: " << tries
+ << "; max_attempts_: " << max_attempts_;
+ // We might receive runtime error from location-cache.cc too, we are doing FailOne and
+ // continue next one
+ if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
+ FailOne(action, tries, ew, TimeUtil::GetNowNanos(), ew.what().toStdString());
+ } else {
+ AddError(action, loc[i].exception(), nullptr);
+ locate_failed.push_back(action);
+ }
+ }
+ }
+ if (!actions_by_server.empty()) {
+ Send(actions_by_server, tries);
+ }
+
+ if (!locate_failed.empty()) {
+ TryResubmit(locate_failed, tries);
+ }
+ })
+ .onError([=](const folly::exception_wrapper &ew) {
+ VLOG(1) << "GetRegionLocations() exception: " << ew.what().toStdString()
+ << "tries: " << tries << "; max_attempts_: " << max_attempts_;
+ std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
+ if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(ew)) {
+ FailAll(actions, tries, ew, nullptr);
+ } else {
+ TryResubmit(actions, tries);
+ }
+ });
+ return;
+}
+
+template <typename REQ, typename RESP>
+Future<std::vector<Try<std::unique_ptr<Response>>>>
+AsyncBatchRpcRetryingCaller<REQ, RESP>::GetMultiResponse(const ActionsByServer &actions_by_server) {
+ auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{};
+ auto user = User::defaultUser();
+ for (const auto &action_by_server : actions_by_server) {
+ std::unique_ptr<Request> multi_req =
+ RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region());
+ auto host = action_by_server.first->host_name();
+ int port = action_by_server.first->port();
+ multi_calls.push_back(
+ rpc_client_->AsyncCall(host, port, std::move(multi_req), user, "ClientService"));
+ }
+ return collectAll(multi_calls);
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::Send(const ActionsByServer &actions_by_server,
+ int32_t tries) {
+ int64_t remaining_ns;
+ if (operation_timeout_ns_.count() > 0) {
+ remaining_ns = RemainingTimeNs();
+ if (remaining_ns <= 0) {
+ std::vector<std::shared_ptr<Action>> failed_actions;
+ for (const auto &action_by_server : actions_by_server) {
+ for (auto &value : action_by_server.second->actions_by_region()) {
+ for (const auto &failed_action : value.second->actions()) {
+ failed_actions.push_back(failed_action);
+ }
+ }
+ }
+ FailAll(failed_actions, tries);
+ return;
+ }
+ } else {
+ remaining_ns = std::numeric_limits<int64_t>::max();
+ }
+
+ std::vector<std::shared_ptr<Request>> multi_reqv;
+ for (const auto &action_by_server : actions_by_server)
+ multi_reqv.push_back(
+ std::move(RequestConverter::ToMultiRequest(action_by_server.second->actions_by_region())));
+
+ GetMultiResponse(actions_by_server)
+ .then([=](const std::vector<Try<std::unique_ptr<hbase::Response>>> &completed_responses) {
+ std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
+ uint64_t num = 0;
+ for (const auto &action_by_server : actions_by_server) {
+ if (completed_responses[num].hasValue()) {
+ auto multi_response =
+ ResponseConverter::GetResults(multi_reqv[num], *completed_responses[num].value(),
+ action_by_server.second->actions_by_region());
+ OnComplete(action_by_server.second->actions_by_region(), tries, action_by_server.first,
+ std::move(multi_response));
+ } else if (completed_responses[num].hasException()) {
+ folly::exception_wrapper ew = completed_responses[num].exception();
+ VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString()
+ << " from server for action index:" << num;
+ OnError(action_by_server.second->actions_by_region(), tries, ew,
+ action_by_server.first);
+ }
+ num++;
+ }
+ })
+ .onError([=](const folly::exception_wrapper &ew) {
+ VLOG(1) << "GetMultiResponse() exception: " << ew.what().toStdString();
+ std::lock_guard<std::recursive_mutex> lck(multi_mutex_);
+ for (const auto &action_by_server : actions_by_server) {
+ OnError(action_by_server.second->actions_by_region(), tries, ew, action_by_server.first);
+ }
+ });
+ return;
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
+ const ActionsByRegion &actions_by_region, int32_t tries,
+ const std::shared_ptr<ServerName> server_name,
+ const std::unique_ptr<hbase::MultiResponse> multi_response) {
+ std::vector<std::shared_ptr<Action>> failed_actions;
+ const auto region_results = multi_response->RegionResults();
+ for (const auto &action_by_region : actions_by_region) {
+ auto region_result_itr = region_results.find(action_by_region.first);
+ if (region_result_itr != region_results.end()) {
+ for (const auto &action : action_by_region.second->actions()) {
+ OnComplete(action, action_by_region.second, tries, server_name, region_result_itr->second,
+ failed_actions);
+ }
+ } else if (region_result_itr == region_results.end()) {
+ auto region_exc = multi_response->RegionException(action_by_region.first);
+ if (region_exc == nullptr) {
+ // FailAll actions for this particular region as inconsistent server response. So we raise
+ // this exception to the application
+ std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
+ " sent us neither results nor exceptions for " +
+ action_by_region.first;
+ VLOG(1) << err_msg;
+ auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
+ FailAll(action_by_region.second->actions(), tries, ew, server_name);
+ } else {
+ // Eg: org.apache.hadoop.hbase.NotServingRegionException:
+ LogException(tries, action_by_region.second, *region_exc, server_name);
+ if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*region_exc)) {
+ FailAll(action_by_region.second->actions(), tries, *region_exc, server_name);
+ return;
+ }
+ location_cache_->UpdateCachedLocation(*action_by_region.second->region_location(),
+ *region_exc);
+ AddError(action_by_region.second->actions(), *region_exc, server_name);
+ for (const auto &action : action_by_region.second->actions()) {
+ failed_actions.push_back(action);
+ }
+ }
+ }
+ }
+ if (!failed_actions.empty()) {
+ TryResubmit(failed_actions, tries);
+ }
+
+ return;
+}
+
+template <typename REQ, typename RESP>
+void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete(
+ const std::shared_ptr<Action> &action, const std::shared_ptr<RegionRequest> ®ion_request,
+ int32_t tries, const std::shared_ptr<ServerName> &server_name,
+ const std::shared_ptr<RegionResult> ®ion_result,
+ std::vector<std::shared_ptr<Action>> &failed_actions) {
+ std::string err_msg;
+ try {
+ auto result_or_exc = region_result->ResultOrException(action->original_index());
+ auto result = std::get<0>(*result_or_exc);
+ auto exc = std::get<1>(*result_or_exc);
+ if (exc != nullptr) {
+ LogException(tries, region_request, *exc, server_name);
+ if (tries >= max_attempts_ || !ExceptionUtil::ShouldRetry(*exc)) {
+ FailOne(action, tries, *exc, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name));
+ } else {
+ failed_actions.push_back(action);
+ }
+ } else if (result != nullptr) {
+ action2promises_[action->original_index()].setValue(std::move(result));
+ } else {
+ std::string err_msg = "Invalid response: Server " + server_name->ShortDebugString() +
+ " sent us neither results nor exceptions for request @ index " +
+ std::to_string(action->original_index()) + ", row " +
+ action->action()->row() + " of " +
+ region_request->region_location()->region_name();
+ VLOG(1) << err_msg;
+ auto ew = folly::make_exception_wrapper<std::runtime_error>(err_msg);
+ AddError(action, ew, server_name);
+ failed_actions.push_back(action);
+ }
+ } catch (const std::out_of_range &oor) {
+ // This should never occur. Error in logic. Throwing std::runtime_error from here. Will be
+ // retried or failed
+ std::string err_msg = "ResultOrException not present @ index " +
+ std::to_string(action->original_index()) + ", row " +
+ action->action()->row() + " of " +
+ region_request->region_location()->region_name();
+ throw std::runtime_error(err_msg);
+ }
+ return;
+}
+
+template class AsyncBatchRpcRetryingCaller<std::shared_ptr<hbase::Row>,
+ std::shared_ptr<hbase::Result>>;
+} /* namespace hbase */