You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by wd...@apache.org on 2019/01/29 17:45:08 UTC
[kudu] 01/02: [tools] Add table scan tool
This is an automated email from the ASF dual-hosted git repository.
wdberkeley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 0afeddf9e530762e0e47beb7428982763715c746
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Sat Jan 5 03:41:50 2019 -0500
[tools] Add table scan tool
This commit adds a basic tool to scan rows from a table. Several
predicates can specified on the query. Unlike traditional SQL
syntax, the scan tool's simple query predicates are represented in a
simple JSON syntax. Three types of predicates are supported, including
'Comparison', 'InList' and 'IsNull'.
* The 'Comparison' type support <=, <, ==, > and >=,
which can be represented as '[operator, column_name, value]',
e.g. '[">=", "col1", "value"]'
* The 'InList' type can be represented as
'["IN", column_name, [value1, value2, ...]]'
e.g. '["IN", "col2", ["value1", "value2"]]'
* The 'IsNull' type determine whether the value is NULL or not,
which can be represented as '[operator, column_name]'
e.g. '["NULL", "col1"]', or '["NOTNULL", "col2"]'
Predicates can be combined together with predicate operators using the syntax
[operator, predicate, predicate, ..., predicate].
For example,
["AND", [">=", "col1", "value"], ["NOTNULL", "col2"]]
The only supported predicate operator is `AND`.
Change-Id: Ieac340b70a9eaf131f82a2b7d61336211d1d48f8
Reviewed-on: http://gerrit.cloudera.org:8080/12167
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
---
src/kudu/client/scanner-internal.h | 3 +
src/kudu/tools/CMakeLists.txt | 1 +
src/kudu/tools/kudu-tool-test.cc | 185 +++++++++++++++++
src/kudu/tools/table_scanner.cc | 380 ++++++++++++++++++++++++++++++++++
src/kudu/tools/table_scanner.h | 67 ++++++
src/kudu/tools/tool_action_cluster.cc | 4 +-
src/kudu/tools/tool_action_common.cc | 6 +
src/kudu/tools/tool_action_perf.cc | 4 +-
src/kudu/tools/tool_action_table.cc | 73 +++++--
9 files changed, 694 insertions(+), 29 deletions(-)
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 5e7652b..7b233bf 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -311,6 +311,9 @@ class KuduScanBatch::Data {
<< row_format_flags_;
DCHECK_GE(idx, 0);
DCHECK_LT(idx, num_rows());
+ if (direct_data_.empty()) {
+ return KuduRowResult(projection_, nullptr);
+ }
int offset = idx * projected_row_size_;
return KuduRowResult(projection_, &direct_data_[offset]);
}
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index 1be02ca..ebe894c 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -42,6 +42,7 @@ add_library(kudu_tools_util
color.cc
data_gen_util.cc
diagnostics_log_parser.cc
+ table_scanner.cc
tool_action.cc
tool_action_common.cc
)
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index b8fd9d6..1c0633f 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -47,6 +47,7 @@
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h"
+#include "kudu/client/write_op.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
@@ -140,10 +141,12 @@ using kudu::cfile::StringDataGenerator;
using kudu::cfile::WriterOptions;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
+using kudu::client::KuduInsert;
using kudu::client::KuduScanToken;
using kudu::client::KuduScanTokenBuilder;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::sp::shared_ptr;
using kudu::cluster::ExternalMiniCluster;
@@ -181,9 +184,11 @@ using std::back_inserter;
using std::copy;
using std::make_pair;
using std::map;
+using std::max;
using std::ostringstream;
using std::pair;
using std::string;
+using std::to_string;
using std::unique_ptr;
using std::unordered_map;
using std::unordered_set;
@@ -378,6 +383,41 @@ class ToolTest : public KuduTest {
return Status::OK();
}
+ void RunScanTableCheck(const string& table_name,
+ const string& predicates_json,
+ int64_t min_value,
+ int64_t max_value,
+ const vector<pair<string, string>>& columns = {{"int32", "key"}}) {
+ vector<string> col_names;
+ for (const auto& column : columns) {
+ col_names.push_back(column.second);
+ }
+ const string projection = JoinStrings(col_names, ",");
+
+ vector<string> lines;
+ int64_t total = max(max_value - min_value + 1, 0L);
+ NO_FATALS(RunActionStdoutLines(
+ Substitute("table scan $0 $1 -show_value=true "
+ "-columns=$2 -predicates=$3",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ table_name, projection, predicates_json), &lines));
+ for (int64_t value = min_value; value <= max_value; ++value) {
+ // Check projection.
+ vector<string> kvs;
+ for (const auto& column : columns) {
+ // Check matched rows.
+ kvs.push_back(Substitute("$0 $1=$2",
+ column.first, column.second, column.second == "key" ? to_string(value) : ".*"));
+ }
+ string line_pattern(R"*(\()*");
+ line_pattern += JoinStrings(kvs, ", ");
+ line_pattern += (")");
+ ASSERT_STRINGS_ANY_MATCH(lines, line_pattern);
+ }
+ // Check total count.
+ ASSERT_STRINGS_ANY_MATCH(lines, Substitute("Total count $0 ", total));
+ }
+
protected:
void RunLoadgen(int num_tservers = 1,
const vector<string>& tool_args = {},
@@ -589,6 +629,7 @@ TEST_F(ToolTest, TestModeHelp) {
"rename_table.*Rename a table",
"rename_column.*Rename a column",
"list.*List tables",
+ "scan.*Scan rows from a table",
};
NO_FATALS(RunTestHelp("table", kTableModeRegexes));
}
@@ -2130,6 +2171,7 @@ TEST_F(ToolTest, TestMasterList) {
// (2)rename a table
// (3)rename a column
// (4)list tables
+// (5)scan a table
TEST_F(ToolTest, TestDeleteTable) {
NO_FATALS(StartExternalMiniCluster());
shared_ptr<KuduClient> client;
@@ -2311,6 +2353,149 @@ TEST_F(ToolTest, TestListTables) {
}
}
+TEST_F(ToolTest, TestScanTablePredicates) {
+ NO_FATALS(StartExternalMiniCluster());
+ string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+ const string kTableName = "kudu.table.scan.predicates";
+
+ // Create the src table and write some data to it.
+ TestWorkload ww(cluster_.get());
+ ww.set_table_name(kTableName);
+ ww.set_num_replicas(1);
+ ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+ ww.set_num_write_threads(1);
+ ww.Setup();
+ ww.Start();
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_GE(ww.rows_inserted(), 10);
+ });
+ ww.StopAndJoin();
+ int64_t total_rows = ww.rows_inserted();
+
+ // Insert one more row with a NULL value column.
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(cluster_->CreateClient(nullptr, &client));
+ shared_ptr<KuduSession> session = client->NewSession();
+ session->SetTimeoutMillis(20000);
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client->OpenTable(kTableName, &table));
+ unique_ptr<KuduInsert> insert(table->NewInsert());
+ ASSERT_OK(insert->mutable_row()->SetInt32("key", ++total_rows));
+ ASSERT_OK(insert->mutable_row()->SetInt32("int_val", 1));
+ ASSERT_OK(session->Apply(insert.release()));
+ ASSERT_OK(session->Flush());
+
+ // Check predicates.
+ RunScanTableCheck(kTableName, "", 1, total_rows);
+ RunScanTableCheck(kTableName, R"*(["AND",["=","key",1]])*", 1, 1);
+ int64_t mid = total_rows / 2;
+ RunScanTableCheck(kTableName,
+ Substitute(R"*(["AND",[">","key",$0]])*", mid),
+ mid + 1, total_rows);
+ RunScanTableCheck(kTableName,
+ Substitute(R"*(["AND",[">=","key",$0]])*", mid),
+ mid, total_rows);
+ RunScanTableCheck(kTableName,
+ Substitute(R"*(["AND",["<","key",$0]])*", mid),
+ 1, mid - 1);
+ RunScanTableCheck(kTableName,
+ Substitute(R"*(["AND",["<=","key",$0]])*", mid),
+ 1, mid);
+ RunScanTableCheck(kTableName,
+ R"*(["AND",["IN","key",[1,2,3,4,5]]])*",
+ 1, 5);
+ RunScanTableCheck(kTableName,
+ R"*(["AND",["NOTNULL","string_val"]])*",
+ 1, total_rows - 1);
+ RunScanTableCheck(kTableName,
+ R"*(["AND",["NULL","string_val"]])*",
+ total_rows, total_rows);
+ RunScanTableCheck(kTableName,
+ R"*(["AND",["IN","key",[0,1,2,3]],)*"
+ R"*(["<","key",8],[">=","key",1],["NOTNULL","key"],)*"
+ R"*(["NOTNULL","string_val"]])*",
+ 1, 3);
+}
+
+TEST_F(ToolTest, TestScanTableProjection) {
+ NO_FATALS(StartExternalMiniCluster());
+ string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+ const string kTableName = "kudu.table.scan.projection";
+
+ // Create the src table and write some data to it.
+ TestWorkload ww(cluster_.get());
+ ww.set_table_name(kTableName);
+ ww.set_num_replicas(1);
+ ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+ ww.set_num_write_threads(1);
+ ww.Setup();
+ ww.Start();
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_GE(ww.rows_inserted(), 10);
+ });
+ ww.StopAndJoin();
+
+ // Check projections.
+ string one_row_json = R"*(["AND",["=","key",1]])*";
+ RunScanTableCheck(kTableName, one_row_json, 1, 1, {});
+ RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"int32", "key"}});
+ RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"string", "string_val"}});
+ RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"int32", "key"},
+ {"string", "string_val"}});
+ RunScanTableCheck(kTableName, one_row_json, 1, 1, {{"int32", "key"},
+ {"int32", "int_val"},
+ {"string", "string_val"}});
+}
+
+TEST_F(ToolTest, TestScanTableMultiPredicates) {
+ NO_FATALS(StartExternalMiniCluster());
+ string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+ const string kTableName = "kudu.table.scan.multipredicates";
+
+ // Create the src table and write some data to it.
+ TestWorkload ww(cluster_.get());
+ ww.set_table_name(kTableName);
+ ww.set_num_replicas(1);
+ ww.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+ ww.set_num_write_threads(1);
+ ww.Setup();
+ ww.Start();
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_GE(ww.rows_inserted(), 1000);
+ });
+ ww.StopAndJoin();
+ int64_t total_rows = ww.rows_inserted();
+ int64_t mid = total_rows / 2;
+
+ vector<string> lines;
+ NO_FATALS(RunActionStdoutLines(
+ Substitute("table scan $0 $1 -show_value=true "
+ "-columns=key,string_val -predicates=$2",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ kTableName,
+ Substitute(R"*(["AND",[">","key",$0],)*"
+ R"*(["<=","key",$1],)*"
+ R"*([">=","string_val","a"],)*"
+ R"*(["<","string_val","b"]])*", mid, total_rows)),
+ &lines));
+ for (auto line : lines) {
+ size_t pos1 = line.find("(int64 key=");
+ if (pos1 != string::npos) {
+ size_t pos2 = line.find(", string string_val=a", pos1);
+ ASSERT_NE(pos2, string::npos);
+ int32_t key;
+ ASSERT_TRUE(safe_strto32(line.substr(pos1, pos2).c_str(), &key));
+ ASSERT_GT(key, mid);
+ ASSERT_LE(key, total_rows);
+ }
+ }
+ ASSERT_LE(lines.size(), mid);
+}
+
Status CreateLegacyHmsTable(HmsClient* client,
const string& hms_database_name,
const string& hms_table_name,
diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc
new file mode 100644
index 0000000..a2f2e7b
--- /dev/null
+++ b/src/kudu/tools/table_scanner.cc
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/table_scanner.h"
+
+#include <stddef.h>
+
+#include <iostream>
+#include <map>
+#include <memory>
+#include <set>
+
+#include <boost/bind.hpp>
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <rapidjson/document.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/scan_batch.h"
+#include "kudu/client/scan_predicate.h"
+#include "kudu/client/schema.h"
+#include "kudu/client/value.h"
+#include "kudu/common/column_predicate.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/string_case.h"
+
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduPredicate;
+using kudu::client::KuduScanBatch;
+using kudu::client::KuduScanner;
+using kudu::client::KuduScanTokenBuilder;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduValue;
+using strings::Substitute;
+using std::cout;
+using std::endl;
+using std::map;
+using std::set;
+using std::unique_ptr;
+
+DECLARE_string(columns);
+DEFINE_bool(fill_cache, true,
+ "Whether to fill block cache when scanning.");
+DECLARE_int32(num_threads);
+
+DEFINE_string(predicates, "",
+ "Query predicates on columns. Unlike traditional SQL syntax, "
+ "the scan tool's simple query predicates are represented in a "
+ "simple JSON syntax. Three types of predicates are supported, "
+ "including 'Comparison', 'InList' and 'IsNull'.\n"
+ " * The 'Comparison' type support <=, <, ==, > and >=,\n"
+ " which can be represented as '[operator, column_name, value]',""\n"
+ R"*( e.g. '[">=", "col1", "value"]')*""\n"
+ " * The 'InList' type can be represented as\n"
+ R"*( '["IN", column_name, [value1, value2, ...]]')*""\n"
+ R"*( e.g. '["IN", "col2", ["value1", "value2"]]')*""\n"
+ " * The 'IsNull' type determine whether the value is NULL or not,\n"
+ " which can be represented as '[operator, column_name]'\n"
+ R"*( e.g. '["NULL", "col1"]', or '["NOTNULL", "col2"]')*""\n"
+ "Predicates can be combined together with predicate operators using the syntax\n"
+ " [operator, predicate, predicate, ..., predicate].\n"
+ "For example,\n"
+ R"*( ["AND", [">=", "col1", "value"], ["NOTNULL", "col2"]])*""\n"
+ "The only supported predicate operator is `AND`.");
+DEFINE_bool(show_value, false,
+ "Whether to show values of scanned rows.");
+DECLARE_string(tablets);
+
+namespace kudu {
+namespace tools {
+
+PredicateType ParsePredicateType(const string& predicate_type) {
+ string predicate_type_uc;
+ ToUpperCase(predicate_type, &predicate_type_uc);
+ if (predicate_type_uc == "=") {
+ return PredicateType::Equality;
+ } else if (predicate_type_uc == "<" ||
+ predicate_type_uc == "<=" ||
+ predicate_type_uc == ">" ||
+ predicate_type_uc == ">=") {
+ return PredicateType::Range;
+ } else if (predicate_type_uc == "NULL") {
+ return PredicateType::IsNull;
+ } else if (predicate_type_uc == "NOTNULL") {
+ return PredicateType::IsNotNull;
+ } else if (predicate_type_uc == "IN") {
+ return PredicateType::InList;
+ } else {
+ LOG(FATAL) << Substitute("unhandled predicate type $0", predicate_type);
+ return PredicateType::None;
+ }
+}
+
+KuduValue* ParseValue(KuduColumnSchema::DataType type,
+ const rapidjson::Value* value) {
+ CHECK(value != nullptr);
+ switch (type) {
+ case KuduColumnSchema::DataType::INT8:
+ case KuduColumnSchema::DataType::INT16:
+ case KuduColumnSchema::DataType::INT32:
+ CHECK(value->IsInt());
+ return KuduValue::FromInt(value->GetInt());
+ case KuduColumnSchema::DataType::INT64:
+ CHECK(value->IsInt64());
+ return KuduValue::FromInt(value->GetInt64());
+ case KuduColumnSchema::DataType::STRING:
+ CHECK(value->IsString());
+ return KuduValue::CopyString(value->GetString());
+ case KuduColumnSchema::DataType::BOOL:
+ CHECK(value->IsBool());
+ return KuduValue::FromBool(value->GetBool());
+ case KuduColumnSchema::DataType::FLOAT:
+ CHECK(value->IsDouble());
+ return KuduValue::FromFloat(static_cast<float>(value->GetDouble()));
+ case KuduColumnSchema::DataType::DOUBLE:
+ CHECK(value->IsDouble());
+ return KuduValue::FromDouble(value->GetDouble());
+ default:
+ LOG(FATAL) << Substitute("unhandled data type $0", type);
+ }
+
+ return nullptr;
+}
+
+KuduPredicate* NewComparisonPredicate(const client::sp::shared_ptr<KuduTable>& table,
+ KuduColumnSchema::DataType type,
+ const string& predicate_type,
+ const string& column_name,
+ const rapidjson::Value* value) {
+ KuduValue* kudu_value = ParseValue(type, value);
+ CHECK(kudu_value != nullptr);
+ client::KuduPredicate::ComparisonOp cop;
+ if (predicate_type == "<") {
+ cop = client::KuduPredicate::ComparisonOp::LESS;
+ } else if (predicate_type == "<=") {
+ cop = client::KuduPredicate::ComparisonOp::LESS_EQUAL;
+ } else if (predicate_type == "=") {
+ cop = client::KuduPredicate::ComparisonOp::EQUAL;
+ } else if (predicate_type == ">") {
+ cop = client::KuduPredicate::ComparisonOp::GREATER;
+ } else if (predicate_type == ">=") {
+ cop = client::KuduPredicate::ComparisonOp::GREATER_EQUAL;
+ } else {
+ return nullptr;
+ }
+ return table->NewComparisonPredicate(column_name, cop, kudu_value);
+}
+
+KuduPredicate* NewIsNullPredicate(const client::sp::shared_ptr<KuduTable>& table,
+ const string& column_name,
+ PredicateType pt) {
+ switch (pt) {
+ case PredicateType::IsNotNull:
+ return table->NewIsNotNullPredicate(column_name);
+ case PredicateType::IsNull:
+ return table->NewIsNullPredicate(column_name);
+ default:
+ return nullptr;
+ }
+}
+
+KuduPredicate* NewInListPredicate(const client::sp::shared_ptr<KuduTable> &table,
+ KuduColumnSchema::DataType type,
+ const string &name,
+ const JsonReader &reader,
+ const rapidjson::Value *object) {
+ CHECK(object->IsArray());
+ vector<const rapidjson::Value*> values;
+ reader.ExtractObjectArray(object, nullptr, &values);
+ vector<KuduValue *> kudu_values;
+ for (const auto& value : values) {
+ kudu_values.emplace_back(ParseValue(type, value));
+ }
+ return table->NewInListPredicate(name, &kudu_values);
+}
+
+Status AddPredicate(const client::sp::shared_ptr<KuduTable>& table,
+ const string& predicate_type,
+ const string& column_name,
+ const boost::optional<const rapidjson::Value*>& value,
+ const JsonReader& reader,
+ KuduScanTokenBuilder& builder) {
+ if (predicate_type.empty() || column_name.empty()) {
+ return Status::OK();
+ }
+
+ Schema schema_internal = KuduSchema::ToSchema(table->schema());
+ int idx = schema_internal.find_column(column_name);
+ if (PREDICT_FALSE(idx == Schema::kColumnNotFound)) {
+ return Status::NotFound("no such column", column_name);
+ }
+ auto type = table->schema().Column(static_cast<size_t>(idx)).type();
+ KuduPredicate* predicate = nullptr;
+ PredicateType pt = ParsePredicateType(predicate_type);
+ switch (pt) {
+ case PredicateType::Equality:
+ case PredicateType::Range:
+ CHECK(value);
+ predicate = NewComparisonPredicate(table, type, predicate_type, column_name, value.get());
+ break;
+ case PredicateType::IsNotNull:
+ case PredicateType::IsNull:
+ CHECK(!value);
+ predicate = NewIsNullPredicate(table, column_name, pt);
+ break;
+ case PredicateType::InList: {
+ CHECK(value);
+ predicate = NewInListPredicate(table, type, column_name, reader, value.get());
+ break;
+ }
+ default:
+ return Status::NotSupported(Substitute("not support predicate_type $0", predicate_type));
+ }
+ CHECK(predicate);
+ RETURN_NOT_OK(builder.AddConjunctPredicate(predicate));
+
+ return Status::OK();
+}
+
+Status AddPredicates(const client::sp::shared_ptr<KuduTable>& table,
+ KuduScanTokenBuilder& builder) {
+ if (FLAGS_predicates.empty()) {
+ return Status::OK();
+ }
+ JsonReader reader(FLAGS_predicates);
+ RETURN_NOT_OK(reader.Init());
+ vector<const rapidjson::Value*> predicate_objects;
+ RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(),
+ nullptr,
+ &predicate_objects));
+ vector<unique_ptr<KuduPredicate>> predicates;
+ for (int i = 0; i < predicate_objects.size(); ++i) {
+ if (i == 0) {
+ CHECK(predicate_objects[i]->IsString());
+ string op;
+ ToUpperCase(predicate_objects[i]->GetString(), &op);
+ if (op != "AND") {
+ return Status::InvalidArgument(Substitute("only 'AND' operator is supported now"));
+ }
+ continue;
+ }
+
+ CHECK(predicate_objects[i]->IsArray());
+ vector<const rapidjson::Value*> elements;
+ reader.ExtractObjectArray(predicate_objects[i], nullptr, &elements);
+ if (elements.size() == 2 || elements.size() == 3) {
+ CHECK(elements[0]->IsString());
+ CHECK(elements[1]->IsString());
+ RETURN_NOT_OK(AddPredicate(table,
+ elements[0]->GetString(),
+ elements[1]->GetString(),
+ elements.size() == 2 ?
+ boost::none : boost::optional<const rapidjson::Value*>(elements[2]),
+ reader,
+ builder));
+ } else {
+ return Status::InvalidArgument(
+ Substitute("invalid predicate elements count $0", elements.size()));
+ }
+ }
+
+ return Status::OK();
+}
+
+void TableScanner::ScannerTask(const vector<KuduScanToken *>& tokens) {
+ for (auto token : tokens) {
+ Stopwatch sw(Stopwatch::THIS_THREAD);
+ sw.start();
+
+ KuduScanner* scanner;
+ CHECK_OK(token->IntoKuduScanner(&scanner));
+ CHECK_OK(scanner->Open());
+
+ uint64_t count = 0;
+ while (scanner->HasMoreRows()) {
+ KuduScanBatch batch;
+ CHECK_OK(scanner->NextBatch(&batch));
+ count += batch.NumRows();
+ total_count_.IncrementBy(batch.NumRows());
+ if (FLAGS_show_value) {
+ for (const auto& row : batch) {
+ cout << row.ToString() << endl;
+ }
+ }
+ }
+ delete scanner;
+
+ sw.stop();
+ cout << "T " << token->tablet().id() << " scanned count " << count
+ << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
+ }
+}
+
+void TableScanner::MonitorTask() {
+ MonoTime last_log_time = MonoTime::Now();
+ while (thread_pool_->num_threads() > 1) { // Some other table scan thread is running.
+ if (MonoTime::Now() - last_log_time >= MonoDelta::FromSeconds(5)) {
+ LOG(INFO) << "Scanned count: " << total_count_.Load() << endl;
+ last_log_time = MonoTime::Now();
+ }
+ SleepFor(MonoDelta::FromMilliseconds(100));
+ }
+}
+
+Status TableScanner::Run() {
+ client::sp::shared_ptr<KuduTable> table;
+ RETURN_NOT_OK(client_->OpenTable(table_name_, &table));
+
+ KuduScanTokenBuilder builder(table.get());
+ RETURN_NOT_OK(builder.SetCacheBlocks(FLAGS_fill_cache));
+ RETURN_NOT_OK(builder.SetSelection(KuduClient::LEADER_ONLY));
+ RETURN_NOT_OK(builder.SetReadMode(KuduScanner::READ_LATEST));
+ RETURN_NOT_OK(builder.SetTimeoutMillis(30000));
+
+ vector<string> projected_column_names = Split(FLAGS_columns, ",", strings::SkipEmpty());
+ RETURN_NOT_OK(builder.SetProjectedColumnNames(projected_column_names));
+ RETURN_NOT_OK(AddPredicates(table, builder));
+
+ vector<KuduScanToken*> tokens;
+ ElementDeleter deleter(&tokens);
+ RETURN_NOT_OK(builder.Build(&tokens));
+
+ const set<string>& tablet_id_filters = Split(FLAGS_tablets, ",", strings::SkipWhitespace());
+ map<int, vector<KuduScanToken*>> thread_tokens;
+ int i = 0;
+ for (auto token : tokens) {
+ if (tablet_id_filters.empty() || ContainsKey(tablet_id_filters, token->tablet().id())) {
+ thread_tokens[i++ % FLAGS_num_threads].push_back(token);
+ }
+ }
+
+ RETURN_NOT_OK(ThreadPoolBuilder("table_scan_pool")
+ .set_max_threads(FLAGS_num_threads + 1) // add extra 1 thread for MonitorTask
+ .set_idle_timeout(MonoDelta::FromMilliseconds(1))
+ .Build(&thread_pool_));
+
+ Stopwatch sw(Stopwatch::THIS_THREAD);
+ sw.start();
+ for (i = 0; i < FLAGS_num_threads; ++i) {
+ RETURN_NOT_OK(thread_pool_->SubmitFunc(
+ boost::bind(&TableScanner::ScannerTask, this, thread_tokens[i])));
+ }
+ RETURN_NOT_OK(thread_pool_->SubmitFunc(boost::bind(&TableScanner::MonitorTask, this)));
+ thread_pool_->Wait();
+ thread_pool_->Shutdown();
+
+ sw.stop();
+ cout << "Total count " << total_count_.Load()
+ << " cost " << sw.elapsed().wall_seconds() << " seconds" << endl;
+
+ return Status::OK();
+}
+
+} // namespace tools
+} // namespace kudu
diff --git a/src/kudu/tools/table_scanner.h b/src/kudu/tools/table_scanner.h
new file mode 100644
index 0000000..c9589c5
--- /dev/null
+++ b/src/kudu/tools/table_scanner.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+namespace client {
+class KuduClient;
+class KuduScanToken;
+} // namespace client
+} // namespace kudu
+
+using kudu::client::KuduClient;
+using kudu::client::KuduScanToken;
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace tools {
+class TableScanner {
+public:
+ TableScanner(client::sp::shared_ptr<KuduClient> client, string table_name):
+ total_count_(0),
+ client_(std::move(client)),
+ table_name_(std::move(table_name)) {
+ }
+
+ Status Run();
+
+private:
+ void ScannerTask(const vector<KuduScanToken *>& tokens);
+ void MonitorTask();
+
+private:
+ AtomicInt<uint64_t> total_count_;
+ client::sp::shared_ptr<KuduClient> client_;
+ std::string table_name_;
+ gscoped_ptr<ThreadPool> thread_pool_;
+};
+} // namespace tools
+} // namespace kudu
diff --git a/src/kudu/tools/tool_action_cluster.cc b/src/kudu/tools/tool_action_cluster.cc
index 6a369f8..59d5f00 100644
--- a/src/kudu/tools/tool_action_cluster.cc
+++ b/src/kudu/tools/tool_action_cluster.cc
@@ -64,9 +64,7 @@ using strings::Substitute;
} while (0);
DECLARE_string(tables);
-DEFINE_string(tablets, "",
- "Tablets to check (comma-separated list of IDs) "
- "If not specified, checks all tablets.");
+DECLARE_string(tablets);
DEFINE_string(sections, "*",
"Sections to print (comma-separated list of sections, "
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index 472cb30..24ab535 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -96,6 +96,9 @@ DEFINE_string(print_entries, "decoded",
" id = print only their ids");
DEFINE_string(table_name, "",
"Restrict output to a specific table by name");
+DEFINE_string(tablets, "",
+ "Tablets to check (comma-separated list of IDs) "
+ "If not specified, checks all tablets.");
DEFINE_int64(timeout_ms, 1000 * 60, "RPC timeout in milliseconds");
DEFINE_int32(truncate_data, 100,
"Truncate the data fields to the given number of bytes "
@@ -116,6 +119,9 @@ DEFINE_string(tables, "", "Tables to include (comma-separated list of table name
DEFINE_string(memtracker_output, "table",
"One of 'json', 'json_compact' or 'table'. Table output flattens "
"the memtracker hierarchy.");
+DEFINE_int32(num_threads, 2,
+ "Number of threads to run. Each thread runs its own "
+ "KuduSession.");
namespace boost {
template <typename Signature>
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index e86e5c0..fdd3d94 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -264,9 +264,7 @@ DEFINE_uint64(num_rows_per_thread, 1000,
"Number of rows each thread generates and inserts; "
"0 means unlimited. All rows generated by a thread are inserted "
"in the context of the same session.");
-DEFINE_int32(num_threads, 2,
- "Number of generator threads to run. Each thread runs its own "
- "KuduSession.");
+DECLARE_int32(num_threads);
DEFINE_bool(run_scan, false,
"Whether to run post-insertion scan to verify that the count of "
"the inserted rows matches the expected number. If enabled, "
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index 0519c88..a0c3f06 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -41,46 +41,47 @@
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/table_scanner.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/util/jsonreader.h"
#include "kudu/util/status.h"
-DECLARE_string(tables);
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduPredicate;
+using kudu::client::KuduScanToken;
+using kudu::client::KuduScanTokenBuilder;
+using kudu::client::KuduScanner;
+using kudu::client::KuduSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableAlterer;
+using kudu::client::internal::ReplicaController;
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+
DEFINE_bool(check_row_existence, false,
"Also check for the existence of the row on the leader replica of "
"the tablet. If found, the full row will be printed; if not found, "
"an error message will be printed and the command will return a "
"non-zero status.");
+DEFINE_bool(list_tablets, false,
+ "Include tablet and replica UUIDs in the output");
DEFINE_bool(modify_external_catalogs, true,
"Whether to modify external catalogs, such as the Hive Metastore, "
"when renaming or dropping a table.");
-DEFINE_bool(list_tablets, false,
- "Include tablet and replica UUIDs in the output");
+DECLARE_string(tables);
namespace kudu {
namespace tools {
-using client::KuduClient;
-using client::KuduClientBuilder;
-using client::KuduColumnSchema;
-using client::KuduPredicate;
-using client::KuduScanner;
-using client::KuduScanToken;
-using client::KuduScanTokenBuilder;
-using client::KuduSchema;
-using client::KuduTable;
-using client::KuduTableAlterer;
-using client::internal::ReplicaController;
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-using strings::Split;
-using strings::Substitute;
-
// This class only exists so that ListTables() can easily be friended by
// KuduReplica, KuduReplica::Data, and KuduClientBuilder.
class TableLister {
@@ -391,6 +392,16 @@ Status ListTables(const RunnerContext& context) {
return TableLister::ListTablets(Split(master_addresses_str, ","));
}
+Status ScanTable(const RunnerContext &context) {
+ client::sp::shared_ptr<KuduClient> client;
+ RETURN_NOT_OK(CreateKuduClient(context, &client));
+
+ const string& table_name = FindOrDie(context.required_args, kTableNameArg);
+
+ TableScanner scanner(client, table_name);
+ return scanner.Run();
+}
+
} // anonymous namespace
unique_ptr<Mode> BuildTableMode() {
@@ -452,6 +463,21 @@ unique_ptr<Mode> BuildTableMode() {
.AddOptionalParameter("modify_external_catalogs")
.Build();
+ unique_ptr<Action> scan_table =
+ ActionBuilder("scan", &ScanTable)
+ .Description("Scan rows from a table")
+ .ExtraDescription("Scan rows from an existing table. See the help "
+ "for the --predicates flag on how predicates can be specified.")
+ .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
+ .AddRequiredParameter({ kTableNameArg, "Name of the table to scan"})
+ .AddOptionalParameter("columns")
+ .AddOptionalParameter("fill_cache")
+ .AddOptionalParameter("num_threads")
+ .AddOptionalParameter("predicates")
+ .AddOptionalParameter("show_value")
+ .AddOptionalParameter("tablets")
+ .Build();
+
return ModeBuilder("table")
.Description("Operate on Kudu tables")
.AddAction(std::move(delete_table))
@@ -460,6 +486,7 @@ unique_ptr<Mode> BuildTableMode() {
.AddAction(std::move(locate_row))
.AddAction(std::move(rename_column))
.AddAction(std::move(rename_table))
+ .AddAction(std::move(scan_table))
.Build();
}