You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/06/06 23:42:50 UTC

hbase git commit: HBASE-18173 Append class

Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 abeb09693 -> eec122526


HBASE-18173 Append class


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eec12252
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eec12252
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eec12252

Branch: refs/heads/HBASE-14850
Commit: eec1225268225d9de62fb98b90e2b75f035f5e5d
Parents: abeb096
Author: tedyu <yu...@gmail.com>
Authored: Tue Jun 6 16:42:45 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Jun 6 16:42:45 2017 -0700

----------------------------------------------------------------------
 hbase-native-client/core/BUCK                 |  11 +++
 hbase-native-client/core/append-test.cc       | 106 +++++++++++++++++++++
 hbase-native-client/core/append.cc            |  54 +++++++++++
 hbase-native-client/core/append.h             |  56 +++++++++++
 hbase-native-client/core/client-test.cc       |  30 ++++++
 hbase-native-client/core/raw-async-table.cc   |  14 +++
 hbase-native-client/core/raw-async-table.h    |   2 +
 hbase-native-client/core/request-converter.cc |  12 +++
 hbase-native-client/core/request-converter.h  |   3 +
 hbase-native-client/core/table.cc             |   5 +
 hbase-native-client/core/table.h              |   6 ++
 11 files changed, 299 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 81fd4a7..47e97f5 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -43,6 +43,7 @@ cxx_library(
         "put.h",
         "delete.h",
         "scan.h",
+        "append.h",
         "result.h",
         "result-scanner.h",
         "request-converter.h",
@@ -82,6 +83,7 @@ cxx_library(
         "put.cc",
         "delete.cc",
         "scan.cc",
+        "append.cc",
         "scan-result-cache.cc",
         "raw-async-table.cc",
         "result.cc",
@@ -192,6 +194,15 @@ cxx_test(
     ],
     run_test_separately=True,)
 cxx_test(
+    name="append-test",
+    srcs=[
+        "append-test.cc",
+    ],
+    deps=[
+        ":core",
+    ],
+    run_test_separately=True,)
+cxx_test(
     name="retry-test",
     srcs=[
         "async-rpc-retrying-test.cc",

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/append-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/append-test.cc b/hbase-native-client/core/append-test.cc
new file mode 100644
index 0000000..619826c
--- /dev/null
+++ b/hbase-native-client/core/append-test.cc
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "core/mutation.h"
+#include "core/append.h"
+#include "utils/time-util.h"
+
+using hbase::Append;
+using hbase::Cell;
+using hbase::CellType;
+using hbase::Mutation;
+using hbase::TimeUtil;
+
+const constexpr int64_t Mutation::kLatestTimestamp;
+
+TEST(Append, Row) {
+  Append append{"foo"};
+  EXPECT_EQ("foo", append.row());
+}
+
+TEST(Append, Durability) {
+  Append append{"row"};
+  EXPECT_EQ(hbase::pb::MutationProto_Durability_USE_DEFAULT, append.Durability());
+
+  auto skipWal = hbase::pb::MutationProto_Durability_SKIP_WAL;
+  append.SetDurability(skipWal);
+  EXPECT_EQ(skipWal, append.Durability());
+}
+
+TEST(Append, Timestamp) {
+  Append append{"row"};
+
+  // test default timestamp
+  EXPECT_EQ(Mutation::kLatestTimestamp, append.TimeStamp());
+
+  // set custom timestamp
+  auto ts = TimeUtil::ToMillis(TimeUtil::GetNowNanos());
+  append.SetTimeStamp(ts);
+  EXPECT_EQ(ts, append.TimeStamp());
+
+  // Add a column with custom timestamp
+  append.Add("f", "q", "v");
+  auto &cell = append.FamilyMap().at("f")[0];
+  EXPECT_EQ(ts, cell->Timestamp());
+}
+
+TEST(Append, HasFamilies) {
+  Append append{"row"};
+
+  EXPECT_EQ(false, append.HasFamilies());
+
+  append.Add("f", "q", "v");
+  EXPECT_EQ(true, append.HasFamilies());
+}
+
+TEST(Append, Add) {
+  CellType cell_type = CellType::PUT;
+  std::string row = "row";
+  std::string family = "family";
+  std::string column = "column";
+  std::string value = "value";
+  int64_t timestamp = std::numeric_limits<int64_t>::max();
+  auto cell = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
+
+  // add first cell
+  Append append{"row"};
+  append.Add(std::move(cell));
+  EXPECT_EQ(1, append.FamilyMap().size());
+  EXPECT_EQ(1, append.FamilyMap().at(family).size());
+
+  // add a non-matching row
+  auto cell2 = std::make_unique<Cell>(row, family, column, timestamp, value, cell_type);
+  Append append2{"foo"};
+  ASSERT_THROW(append2.Add(std::move(cell2)), std::runtime_error);  // rows don't match
+
+  // add a second cell with same family
+  auto cell3 = std::make_unique<Cell>(row, family, "column-2", timestamp, value, cell_type);
+  append.Add(std::move(cell3));
+  EXPECT_EQ(1, append.FamilyMap().size());
+  EXPECT_EQ(2, append.FamilyMap().at(family).size());
+
+  // add a cell to a different family
+  auto cell4 = std::make_unique<Cell>(row, "family-2", "column-2", timestamp, value, cell_type);
+  append.Add(std::move(cell4));
+  EXPECT_EQ(2, append.FamilyMap().size());
+  EXPECT_EQ(1, append.FamilyMap().at("family-2").size());
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/append.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/append.cc b/hbase-native-client/core/append.cc
new file mode 100644
index 0000000..18ee45a
--- /dev/null
+++ b/hbase-native-client/core/append.cc
@@ -0,0 +1,54 @@
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/append.h"
+#include <folly/Conv.h>
+#include <algorithm>
+#include <limits>
+#include <stdexcept>
+#include <utility>
+
+namespace hbase {
+
+/**
+ *  @brief Append to the column from the specific family with the specified qualifier
+ *  @param family family name
+ *  @param qualifier column qualifier
+ *  @param value value to append
+ */
+Append& Append::Add(const std::string& family, const std::string& qualifier,
+        const std::string& value) {
+  family_map_[family].push_back(std::move(
+            std::make_unique<Cell>(row_, family, qualifier, timestamp_, value,
+            hbase::CellType::PUT)));
+  return *this;
+}
+Append& Append::Add(std::unique_ptr<Cell> cell) {
+  if (cell->Row() != row_) {
+    throw std::runtime_error("The row in " + cell->DebugString() +
+                             " doesn't match the original one " + row_);
+  }
+
+  family_map_[cell->Family()].push_back(std::move(cell));
+  return *this;
+}
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/append.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/append.h b/hbase-native-client/core/append.h
new file mode 100644
index 0000000..cf9ac24
--- /dev/null
+++ b/hbase-native-client/core/append.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+#include "core/cell.h"
+#include "core/mutation.h"
+
+namespace hbase {
+
+class Append : public Mutation {
+ public:
+  /**
+   * Constructors
+   */
+  explicit Append(const std::string& row) : Mutation(row) {}
+  Append(const Append& cappend) : Mutation(cappend) {}
+  Append& operator=(const Append& cappend) {
+    Mutation::operator=(cappend);
+    return *this;
+  }
+
+  ~Append() = default;
+
+  /**
+   *  @brief Add the specified column and value to this Append operation.
+   *  @param family family name
+   *  @param qualifier column qualifier
+   *  @param value value to append
+   */
+  Append& Add(const std::string& family, const std::string& qualifier, const std::string& value);
+  Append& Add(std::unique_ptr<Cell> cell);
+};
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/client-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index d166f1c..1ee0a83 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -19,6 +19,7 @@
 
 #include <gtest/gtest.h>
 
+#include "core/append.h"
 #include "core/cell.h"
 #include "core/client.h"
 #include "core/configuration.h"
@@ -118,6 +119,35 @@ TEST_F(ClientTest, DefaultConfiguration) {
   client.Close();
 }
 
+TEST_F(ClientTest, Append) {
+  // Using TestUtil to populate test data
+  ClientTest::test_util->CreateTable("t", "d");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>("t");
+  auto row = "test1";
+
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+  std::string val1 = "a";
+  auto result = table->Append(hbase::Append{row}.Add("d", "1", val1));
+
+  ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+  EXPECT_EQ(row, result->Row());
+  EXPECT_EQ(val1, *(result->Value("d", "1")));
+
+  std::string val2 = "b";
+  result = table->Append(hbase::Append{row}.Add("d", "1", val2));
+
+  ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+  EXPECT_EQ(row, result->Row());
+  EXPECT_EQ("ab", *(result->Value("d", "1")));
+}
+
 TEST_F(ClientTest, PutGetDelete) {
   // Using TestUtil to populate test data
   std::string tableName = "t1";

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/raw-async-table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index 998e2f1..2a98d54 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -124,6 +124,20 @@ folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) {
   return caller->Call().then([caller](const auto r) { return r; });
 }
 
+folly::Future<std::shared_ptr<Result>> RawAsyncTable::Append(const hbase::Append& append) {
+  auto caller =
+      CreateCallerBuilder<std::shared_ptr<Result>>(append.row(), connection_conf_->write_rpc_timeout())
+          ->action([=, &append](std::shared_ptr<hbase::HBaseRpcController> controller,
+                             std::shared_ptr<hbase::RegionLocation> loc,
+                             std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<std::shared_ptr<Result>> {
+            return Call<hbase::Append, hbase::Request, hbase::Response, std::shared_ptr<Result>>(
+                rpc_client, controller, loc, append, &hbase::RequestConverter::AppendToMutateRequest,
+                &hbase::ResponseConverter::FromMutateResponse);
+          })
+          ->Build();
+
+  return caller->Call().then([caller](const auto r) { return r; });
+}
 folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Get(
     const std::vector<hbase::Get>& gets) {
   return this->Batch(gets);

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/raw-async-table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index 8c40dae..6088d1b 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -61,6 +61,8 @@ class RawAsyncTable {
 
   folly::Future<folly::Unit> Delete(const hbase::Delete& del);
 
+  folly::Future<std::shared_ptr<hbase::Result>> Append(const hbase::Append& append);
+
   folly::Future<std::shared_ptr<hbase::Result>> Increment(const hbase::Increment& increment);
 
   folly::Future<folly::Unit> Put(const hbase::Put& put);

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/request-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index 6eb2f04..54fdfc5 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -301,4 +301,16 @@ std::unique_ptr<Request> RequestConverter::IncrementToMutateRequest(const Increm
     return pb_req;
 }
 
+std::unique_ptr<Request> RequestConverter::AppendToMutateRequest(const Append &append,
+        const std::string &region_name) {
+    auto pb_req = Request::mutate();
+    auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
+    RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+
+    pb_msg->set_allocated_mutation(
+        ToMutation(MutationType::MutationProto_MutationType_APPEND, append, -1).release());
+
+    VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString();
+    return pb_req;
+}
 } /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/request-converter.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h
index c807f45..a9d65d6 100644
--- a/hbase-native-client/core/request-converter.h
+++ b/hbase-native-client/core/request-converter.h
@@ -25,6 +25,7 @@
 #include "connection/request.h"
 #include "core/action.h"
 #include "core/cell.h"
+#include "core/append.h"
 #include "core/delete.h"
 #include "core/get.h"
 #include "core/increment.h"
@@ -90,6 +91,8 @@ class RequestConverter {
                                                        const Mutation &mutation,
                                                        const int64_t nonce);
 
+  static std::unique_ptr<Request> AppendToMutateRequest(const Append &append,
+          const std::string &region_name);
  private:
   // Constructor not required. We have all static methods to create PB requests.
   RequestConverter();

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index 9cdd8b0..bf22169 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -83,6 +83,11 @@ std::shared_ptr<hbase::Result> Table::Increment(const hbase::Increment &incremen
   return context.get(operation_timeout());
 }
 
+std::shared_ptr<hbase::Result> Table::Append(const hbase::Append &append) {
+  auto context = async_table_->Append(append);
+  return context.get(operation_timeout());
+}
+
 milliseconds Table::operation_timeout() const {
   return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout());
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/eec12252/hbase-native-client/core/table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index 1f6d9b7..781c6f1 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -73,6 +73,12 @@ class Table {
    * @param - increment Increment object to perform HBase Increment operation.
    */
   std::shared_ptr<hbase::Result> Increment(const hbase::Increment &increment);
+
+  /**
+   * @brief - Appends some data in the table.
+   * @param - append Append object to perform HBase Append operation.
+   */
+  std::shared_ptr<hbase::Result> Append(const hbase::Append &append);
   // TODO: Batch Puts
 
   std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan);