You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/18 15:12:48 UTC
[2/9] incubator-impala git commit: IMPALA-2615: support [[nodiscard]]
on Status
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 026a06e..a170ea1 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -73,7 +73,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
// set of in-flight queries.
Status status = SetQueryInflight(session, request_state);
if (!status.ok()) {
- (void) UnregisterQuery(request_state->query_id(), false, &status);
+ discard_result(UnregisterQuery(request_state->query_id(), false, &status));
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
TUniqueIdToQueryHandle(request_state->query_id(), &query_handle);
@@ -111,7 +111,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
// set of in-flight queries.
Status status = SetQueryInflight(session, request_state);
if (!status.ok()) {
- (void) UnregisterQuery(request_state->query_id(), false, &status);
+ discard_result(UnregisterQuery(request_state->query_id(), false, &status));
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
// block until results are ready
@@ -121,7 +121,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
status = request_state->query_status();
}
if (!status.ok()) {
- (void) UnregisterQuery(request_state->query_id(), false, &status);
+ discard_result(UnregisterQuery(request_state->query_id(), false, &status));
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
@@ -171,7 +171,7 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle
VLOG_ROW << "fetch result: #results=" << query_results.data.size()
<< " has_more=" << (query_results.has_more ? "true" : "false");
if (!status.ok()) {
- (void) UnregisterQuery(query_id, false, &status);
+ discard_result(UnregisterQuery(query_id, false, &status));
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 2528349..6a1b5f4 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -154,7 +154,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
Status exec_status = request_state->Exec(*request);
if (!exec_status.ok()) {
- (void) UnregisterQuery(request_state->query_id(), false, &exec_status);
+ discard_result(UnregisterQuery(request_state->query_id(), false, &exec_status));
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
status->__set_errorMessage(exec_status.GetDetail());
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
@@ -165,7 +165,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
Status inflight_status = SetQueryInflight(session, request_state);
if (!inflight_status.ok()) {
- (void) UnregisterQuery(request_state->query_id(), false, &inflight_status);
+ discard_result(UnregisterQuery(request_state->query_id(), false, &inflight_status));
status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
status->__set_errorMessage(inflight_status.GetDetail());
status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
@@ -340,8 +340,8 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
} else {
// Normal configuration key. Use it to set session default query options.
// Ignore failure (failures will be logged in SetQueryOption()).
- SetQueryOption(v.first, v.second, &state->default_query_options,
- &state->set_query_options_mask);
+ discard_result(SetQueryOption(v.first, v.second, &state->default_query_options,
+ &state->set_query_options_mask));
}
}
}
@@ -465,7 +465,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
session->hs2_version, *request_state->result_metadata(), nullptr),
cache_num_rows);
if (!status.ok()) {
- (void) UnregisterQuery(request_state->query_id(), false, &status);
+ discard_result(UnregisterQuery(request_state->query_id(), false, &status));
HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
}
@@ -476,7 +476,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
// set of in-flight queries.
status = SetQueryInflight(session, request_state);
if (!status.ok()) {
- (void) UnregisterQuery(request_state->query_id(), false, &status);
+ discard_result(UnregisterQuery(request_state->query_id(), false, &status));
HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
return_val.__isset.operationHandle = true;
@@ -795,7 +795,7 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
if (status.IsRecoverableError()) {
DCHECK(fetch_first);
} else {
- (void) UnregisterQuery(query_id, false, &status);
+ discard_result(UnregisterQuery(query_id, false, &status));
}
HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 5a79d9d..79903b4 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -547,11 +547,12 @@ void ImpalaHttpHandler::CatalogObjectsHandler(const Webserver::ArgumentMap& args
// Get the object type and name from the topic entry key
TCatalogObject request;
- TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
-
- // Get the object and dump its contents.
TCatalogObject result;
- Status status = server_->exec_env_->frontend()->GetCatalogObject(request, &result);
+ Status status = TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
+ if (status.ok()) {
+ // Get the object and dump its contents.
+ status = server_->exec_env_->frontend()->GetCatalogObject(request, &result);
+ }
if (status.ok()) {
Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator());
document->AddMember("thrift_string", debug_string, document->GetAllocator());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 00a9d9a..e173c84 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -73,6 +73,7 @@
#include "util/runtime-profile.h"
#include "util/string-parser.h"
#include "util/summary-util.h"
+#include "util/test-info.h"
#include "util/uid-util.h"
#include "gen-cpp/Types_types.h"
@@ -357,21 +358,22 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env->metrics()));
- // Register the membership callback if required
- if (exec_env->subscriber() != nullptr) {
+ // Register the membership callback if running in a real cluster.
+ if (!TestInfo::is_test()) {
auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
vector<TTopicDelta>* topic_updates) {
this->MembershipCallback(state, topic_updates);
};
- exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb);
+ ABORT_IF_ERROR(
+ exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb));
if (FLAGS_is_coordinator) {
auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
vector<TTopicDelta>* topic_updates) {
this->CatalogUpdateCallback(state, topic_updates);
};
- exec_env->subscriber()->AddTopic(CatalogServer::IMPALA_CATALOG_TOPIC, true,
- catalog_cb);
+ ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
+ CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb));
}
}
@@ -581,11 +583,11 @@ void ImpalaServer::LogQueryEvents(const ClientRequestState& request_state) {
if (IsAuditEventLoggingEnabled() &&
(Frontend::IsAuthorizationError(request_state.query_status()) || log_events)) {
// TODO: deal with an error status
- (void) LogAuditRecord(request_state, request_state.exec_request());
+ discard_result(LogAuditRecord(request_state, request_state.exec_request()));
}
if (IsLineageLoggingEnabled() && log_events) {
// TODO: deal with an error status
- (void) LogLineageRecord(request_state);
+ discard_result(LogLineageRecord(request_state));
}
}
@@ -622,7 +624,7 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
request_state->user_has_profile_access()));
if (base64_encoded) {
- request_state->profile().SerializeToArchiveString(output);
+ RETURN_IF_ERROR(request_state->profile().SerializeToArchiveString(output));
} else {
request_state->profile().PrettyPrint(output);
}
@@ -703,7 +705,10 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
[[noreturn]] void ImpalaServer::LogFileFlushThread() {
while (true) {
sleep(5);
- profile_logger_->Flush();
+ const Status status = profile_logger_->Flush();
+ if (!status.ok()) {
+ LOG(WARNING) << "Error flushing profile log: " << status.GetDetail();
+ }
}
}
@@ -736,14 +741,21 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
}
void ImpalaServer::ArchiveQuery(const ClientRequestState& query) {
- const string& encoded_profile_str = query.profile().SerializeToArchiveString();
+ string encoded_profile_str;
+ Status status = query.profile().SerializeToArchiveString(&encoded_profile_str);
+ if (!status.ok()) {
+ // Didn't serialize the string. Continue with empty string.
+ LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
+ << status.GetDetail();
+ return;
+ }
// If there was an error initialising archival (e.g. directory is not writeable),
// FLAGS_log_query_to_file will have been set to false
if (FLAGS_log_query_to_file) {
stringstream ss;
ss << UnixMillis() << " " << query.query_id() << " " << encoded_profile_str;
- Status status = profile_logger_->AppendEntry(ss.str());
+ status = profile_logger_->AppendEntry(ss.str());
if (!status.ok()) {
LOG_EVERY_N(WARNING, 1000) << "Could not write to profile log file file ("
<< google::COUNTER << " attempts failed): "
@@ -826,7 +838,7 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx,
Status status = ExecuteInternal(*query_ctx, session_state, ®istered_request_state,
request_state);
if (!status.ok() && registered_request_state) {
- (void) UnregisterQuery((*request_state)->query_id(), false, &status);
+ discard_result(UnregisterQuery((*request_state)->query_id(), false, &status));
}
return status;
}
@@ -1105,7 +1117,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
Status status("Session closed");
for (const TUniqueId& query_id: inflight_queries) {
// TODO: deal with an error status
- (void) UnregisterQuery(query_id, false, &status);
+ discard_result(UnregisterQuery(query_id, false, &status));
}
// Reconfigure the poll period of session_timeout_thread_ if necessary.
int32_t session_timeout = session_state->session_timeout;
@@ -1427,7 +1439,7 @@ void ImpalaServer::CatalogUpdateCallback(
}
ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0);
// TODO: deal with an error status
- (void) UpdateCatalogMetrics();
+ discard_result(UpdateCatalogMetrics());
// Remove all dropped objects from the library cache.
// TODO: is this expensive? We'd like to process heartbeats promptly.
for (TCatalogObject& object: dropped_objects) {
@@ -1690,7 +1702,12 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
request_state.profile().PrettyPrint(&ss);
profile_str = ss.str();
if (encoded_profile.empty()) {
- encoded_profile_str = request_state.profile().SerializeToArchiveString();
+ Status status =
+ request_state.profile().SerializeToArchiveString(&encoded_profile_str);
+ if (!status.ok()) {
+ LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
+ << status.GetDetail();
+ }
} else {
encoded_profile_str = encoded_profile;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 32fbcc8..8a7961c 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -77,7 +77,8 @@ int ImpaladMain(int argc, char** argv) {
// start backend service for the coordinator on be_port
ExecEnv exec_env;
- StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver(), true);
+ ABORT_IF_ERROR(
+ StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver(), true));
InitRpcEventTracing(exec_env.webserver());
CommonMetrics::InitCommonMetrics(exec_env.metrics());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 9006947..397df5a 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -74,7 +74,8 @@ TEST(QueryOptions, SetFilterWait) {
EXPECT_FALSE(SetQueryOption("RUNTIME_FILTER_WAIT_TIME_MS", "-1", &options, NULL).ok());
EXPECT_FALSE(SetQueryOption("RUNTIME_FILTER_WAIT_TIME_MS",
- lexical_cast<string>(numeric_limits<int32_t>::max() + 1), &options, NULL).ok());
+ lexical_cast<string>(static_cast<int64_t>(numeric_limits<int32_t>::max()) + 1),
+ &options, NULL).ok());
EXPECT_OK(SetQueryOption("RUNTIME_FILTER_WAIT_TIME_MS", "0", &options, NULL));
EXPECT_EQ(0, options.runtime_filter_wait_time_ms);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 9f6097c..5d7738c 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -720,8 +720,13 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
// Schedule the next message.
VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " deadline for: "
<< subscriber->id() << " is in " << deadline_ms << "ms";
- OfferUpdate(make_pair(deadline_ms, subscriber->id()), is_heartbeat ?
+ status = OfferUpdate(make_pair(deadline_ms, subscriber->id()), is_heartbeat ?
&subscriber_heartbeat_threadpool_ : &subscriber_topic_update_threadpool_);
+ if (!status.ok()) {
+ LOG(INFO) << "Unable to send next " << (is_heartbeat ? "heartbeat" : "update")
+ << " message to subscriber '" << subscriber->id() << "': "
+ << status.GetDetail();
+ }
}
}
}
@@ -755,7 +760,6 @@ void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
subscribers_.erase(subscriber->id());
}
-Status Statestore::MainLoop() {
+void Statestore::MainLoop() {
subscriber_topic_update_threadpool_.Join();
- return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 44d9792..b3ba315 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -108,14 +108,12 @@ class Statestore : public CacheLineAligned {
Status RegisterSubscriber(const SubscriberId& subscriber_id,
const TNetworkAddress& location,
const std::vector<TTopicRegistration>& topic_registrations,
- TUniqueId* registration_id);
+ TUniqueId* registration_id) WARN_UNUSED_RESULT;
void RegisterWebpages(Webserver* webserver);
/// The main processing loop. Blocks until the exit flag is set.
- //
- /// Returns OK unless there is an unrecoverable error.
- Status MainLoop();
+ void MainLoop();
/// Returns the Thrift API interface that proxies requests onto the local Statestore.
const boost::shared_ptr<StatestoreServiceIf>& thrift_iface() const {
@@ -439,10 +437,10 @@ class Statestore : public CacheLineAligned {
/// Utility method to add an update to the given thread pool, and to fail if the thread
/// pool is already at capacity.
Status OfferUpdate(const ScheduledSubscriberUpdate& update,
- ThreadPool<ScheduledSubscriberUpdate>* thread_pool);
+ ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT;
- /// Sends either a heartbeat or topic update message to the subscriber in 'update' at the
- /// closest possible time to the first member of 'update'. If is_heartbeat is true,
+ /// Sends either a heartbeat or topic update message to the subscriber in 'update' at
+ /// the closest possible time to the first member of 'update'. If is_heartbeat is true,
/// sends a heartbeat update, otherwise the set of pending topic updates is sent. Once
/// complete, the next update is scheduled and added to the appropriate queue.
void DoSubscriberUpdate(bool is_heartbeat, int thread_id,
@@ -458,14 +456,15 @@ class Statestore : public CacheLineAligned {
/// will return OK (since there was no error) and the output parameter update_skipped is
/// set to true. Otherwise, any updates returned by the subscriber are applied to their
/// target topics.
- Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped);
+ Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) WARN_UNUSED_RESULT;
/// Sends a heartbeat message to subscriber. Returns false if there was some error
/// performing the RPC.
- Status SendHeartbeat(Subscriber* subscriber);
+ Status SendHeartbeat(Subscriber* subscriber) WARN_UNUSED_RESULT;
/// Unregister a subscriber, removing all of its transient entries and evicting it from
- /// the subscriber map. Callers must hold subscribers_lock_ prior to calling this method.
+ /// the subscriber map. Callers must hold subscribers_lock_ prior to calling this
+ /// method.
void UnregisterSubscriber(Subscriber* subscriber);
/// Populates a TUpdateStateRequest with the update state for this subscriber. Iterates
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/statestore/statestored-main.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc
index 1dcb682..1f06f04 100644
--- a/be/src/statestore/statestored-main.cc
+++ b/be/src/statestore/statestored-main.cc
@@ -67,10 +67,12 @@ int StatestoredMain(int argc, char** argv) {
LOG(INFO) << "Not starting webserver";
}
- metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr);
+ ABORT_IF_ERROR(
+ metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr));
ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), false, nullptr, nullptr));
StartMemoryMaintenanceThread();
- StartThreadInstrumentation(metrics.get(), webserver.get(), false);
+ ABORT_IF_ERROR(
+ StartThreadInstrumentation(metrics.get(), webserver.get(), false));
InitRpcEventTracing(webserver.get());
// TODO: Add a 'common metrics' method to add standard metrics to
// both statestored and impalad
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/death-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/death-test-util.h b/be/src/testutil/death-test-util.h
index 6421fb7..474025b 100644
--- a/be/src/testutil/death-test-util.h
+++ b/be/src/testutil/death-test-util.h
@@ -25,10 +25,10 @@
// Wrapper around gtest's ASSERT_DEBUG_DEATH that prevents coredumps and minidumps
// being generated as the result of the death test.
#ifndef NDEBUG
-#define IMPALA_ASSERT_DEBUG_DEATH(fn, msg) \
- do { \
- ScopedCoredumpDisabler disable_coredumps; \
- ASSERT_DEBUG_DEATH((void)fn, msg); \
+#define IMPALA_ASSERT_DEBUG_DEATH(fn, msg) \
+ do { \
+ ScopedCoredumpDisabler disable_coredumps; \
+ ASSERT_DEBUG_DEATH((void)fn, msg); \
} while (false);
#else
// Gtest's ASSERT_DEBUG_DEATH macro has peculiar semantics where in debug builds it
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/fault-injection-util.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.cc b/be/src/testutil/fault-injection-util.cc
index e2c32b1..f378c48 100644
--- a/be/src/testutil/fault-injection-util.cc
+++ b/be/src/testutil/fault-injection-util.cc
@@ -19,6 +19,8 @@
#include "testutil/fault-injection-util.h"
+#include <random>
+
#include <thrift/transport/TSSLSocket.h>
#include <thrift/transport/TTransportException.h>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/impalad-query-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/impalad-query-executor.cc b/be/src/testutil/impalad-query-executor.cc
index db0aceb..111b2a6 100644
--- a/be/src/testutil/impalad-query-executor.cc
+++ b/be/src/testutil/impalad-query-executor.cc
@@ -44,7 +44,7 @@ ImpaladQueryExecutor::ImpaladQueryExecutor(const string& hostname, uint32_t port
}
ImpaladQueryExecutor::~ImpaladQueryExecutor() {
- Close();
+ discard_result(Close());
}
Status ImpaladQueryExecutor::Setup() {
@@ -71,7 +71,7 @@ Status ImpaladQueryExecutor::Close() {
Status ImpaladQueryExecutor::Exec(
const string& query_string, vector<FieldSchema>* col_schema) {
// close anything that ran previously
- Close();
+ discard_result(Close());
Query query;
query.query = query_string;
query.configuration = exec_options_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 7a81915..4fecfb3 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -68,7 +68,8 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts(
// pick a new set of ports
Status started = impala->StartWithClientServers(beeswax_port, hs2_port);
if (started.ok()) {
- impala->SetCatalogInitialized();
+ const Status status = impala->SetCatalogInitialized();
+ if (!status.ok()) LOG(WARNING) << status.GetDetail();
return impala;
}
delete impala;
@@ -88,13 +89,14 @@ InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend
statestore_host, statestore_port)) {
}
-void InProcessImpalaServer::SetCatalogInitialized() {
+Status InProcessImpalaServer::SetCatalogInitialized() {
DCHECK(impala_server_ != NULL) << "Call Start*() first.";
- exec_env_->frontend()->SetCatalogInitialized();
+ return exec_env_->frontend()->SetCatalogInitialized();
}
Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int hs2_port) {
RETURN_IF_ERROR(exec_env_->StartServices());
+
beeswax_port_ = beeswax_port;
hs2_port_ = hs2_port;
ThriftServer* be_server;
@@ -158,7 +160,7 @@ InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port
}
Status InProcessStatestore::Start() {
- webserver_->Start();
+ RETURN_IF_ERROR(webserver_->Start());
boost::shared_ptr<TProcessor> processor(
new StatestoreServiceProcessor(statestore_->thrift_iface()));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index 3f91b2a..03b02f3 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -74,7 +74,7 @@ class InProcessImpalaServer {
/// Sets the catalog on this impalad to be initialized. If we don't
/// start up a catalogd, then there is no one to initialize it otherwise.
- void SetCatalogInitialized();
+ Status SetCatalogInitialized();
uint32_t beeswax_port() const { return beeswax_port_; }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/util/benchmark.cc b/be/src/util/benchmark.cc
index 43570a6..6ffbf00 100644
--- a/be/src/util/benchmark.cc
+++ b/be/src/util/benchmark.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <cmath>
#include <iomanip>
#include <iostream>
#include <sstream>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc
index 2c40e569..5f8d443 100644
--- a/be/src/util/bit-util-test.cc
+++ b/be/src/util/bit-util-test.cc
@@ -17,10 +17,12 @@
#include <stdlib.h>
#include <stdio.h>
-#include <iostream>
-#include <algorithm>
#include <limits.h>
+#include <algorithm>
+#include <iostream>
+#include <numeric>
+
#include <boost/utility.hpp>
#include "testutil/gtest-util.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index 9475ec1..b150b3c 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -63,7 +63,8 @@ class Codec {
/// If mem_pool is nullptr, then the resulting codec will never allocate memory and
/// the caller must be responsible for it.
static Status CreateDecompressor(MemPool* mem_pool, bool reuse,
- THdfsCompression::type format, boost::scoped_ptr<Codec>* decompressor);
+ THdfsCompression::type format,
+ boost::scoped_ptr<Codec>* decompressor) WARN_UNUSED_RESULT;
/// Alternate factory method: takes a codec string and populates a scoped pointer.
static Status CreateDecompressor(MemPool* mem_pool, bool reuse,
@@ -88,7 +89,8 @@ class Codec {
/// Return the name of a compression algorithm.
static std::string GetCodecName(THdfsCompression::type);
/// Returns the java class name for the given compression type
- static Status GetHadoopCodecClassName(THdfsCompression::type, std::string* out_name);
+ static Status GetHadoopCodecClassName(
+ THdfsCompression::type, std::string* out_name) WARN_UNUSED_RESULT;
virtual ~Codec() {}
@@ -109,7 +111,8 @@ class Codec {
/// input_length: length of the data to process
/// input: data to process
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
- const uint8_t* input, int64_t* output_length, uint8_t** output) = 0;
+ const uint8_t* input, int64_t* output_length,
+ uint8_t** output) WARN_UNUSED_RESULT = 0;
/// Wrapper to the actual ProcessBlock() function. This wrapper uses lengths as ints and
/// not int64_ts. We need to keep this interface because the Parquet thrift uses ints.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/filesystem-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h
index 3e824b8..1d497c3 100644
--- a/be/src/util/filesystem-util.h
+++ b/be/src/util/filesystem-util.h
@@ -31,22 +31,23 @@ class FileSystemUtil {
/// Create the specified directory and any ancestor directories that do not exist yet.
/// The directory and its contents are destroyed if it already exists.
/// Returns Status::OK if successful, or a runtime error with a message otherwise.
- static Status RemoveAndCreateDirectory(const std::string& directory);
+ static Status RemoveAndCreateDirectory(const std::string& directory) WARN_UNUSED_RESULT;
/// Create a file at the specified path.
- static Status CreateFile(const std::string& file_path);
+ static Status CreateFile(const std::string& file_path) WARN_UNUSED_RESULT;
/// Remove the specified paths and their enclosing files/directories.
- static Status RemovePaths(const std::vector<std::string>& directories);
+ static Status RemovePaths(
+ const std::vector<std::string>& directories) WARN_UNUSED_RESULT;
/// Verify that the specified path is an existing directory.
/// Returns Status::OK if it is, or a runtime error with a message otherwise.
- static Status VerifyIsDirectory(const std::string& directory_path);
+ static Status VerifyIsDirectory(const std::string& directory_path) WARN_UNUSED_RESULT;
/// Returns the space available on the file system containing 'directory_path'
/// in 'available_bytes'
- static Status GetSpaceAvailable(const std::string& directory_path,
- uint64_t* available_bytes);
+ static Status GetSpaceAvailable(
+ const std::string& directory_path, uint64_t* available_bytes) WARN_UNUSED_RESULT;
/// Returns the currently allowed maximum of possible file descriptors. In case of an
/// error returns 0.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/hdfs-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-util-test.cc b/be/src/util/hdfs-util-test.cc
index 89cc43a..b389864 100644
--- a/be/src/util/hdfs-util-test.cc
+++ b/be/src/util/hdfs-util-test.cc
@@ -34,7 +34,8 @@ TEST(HdfsUtilTest, CheckFilesystemsMatch) {
ExecEnv* exec_env = new ExecEnv();
// We do this to retrieve the default FS from the frontend.
- exec_env->StartServices();
+ // It doesn't matter if starting the services fails.
+ discard_result(exec_env->StartServices());
// Tests with both paths qualified.
EXPECT_TRUE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/jni-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc
index 78f67a7..220eb83 100644
--- a/be/src/util/jni-util.cc
+++ b/be/src/util/jni-util.cc
@@ -100,12 +100,6 @@ Status JniUtil::LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global
return Status::OK();
}
-Status JniUtil::FreeGlobalRef(JNIEnv* env, jobject global_ref) {
- env->DeleteGlobalRef(global_ref);
- RETURN_ERROR_IF_EXC(env);
- return Status::OK();
-}
-
Status JniUtil::Init() {
// Get the JNIEnv* corresponding to current thread.
JNIEnv* env = getJNIEnv();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/jni-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 9a9cb15..77abb4d 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -124,7 +124,7 @@ class JniLocalFrame {
/// The number of local references created inside the frame might exceed max_local_ref,
/// but there is no guarantee that memory will be available.
/// Push should be called at most once.
- Status push(JNIEnv* env, int max_local_ref=10);
+ Status push(JNIEnv* env, int max_local_ref = 10) WARN_UNUSED_RESULT;
private:
JNIEnv* env_;
@@ -187,7 +187,7 @@ class JniUtil {
static void InitLibhdfs();
/// Find JniUtil class, and get JniUtil.throwableToString method id
- static Status Init();
+ static Status Init() WARN_UNUSED_RESULT;
/// Returns true if the given class could be found on the CLASSPATH in env.
/// Returns false otherwise, or if any other error occurred (e.g. a JNI exception).
@@ -204,13 +204,15 @@ class JniUtil {
/// The returned reference must eventually be freed by calling FreeGlobalRef() (or have
/// the lifetime of the impalad process).
/// Catches Java exceptions and converts their message into status.
- static Status GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref);
+ static Status GetGlobalClassRef(
+ JNIEnv* env, const char* class_str, jclass* class_ref) WARN_UNUSED_RESULT;
/// Creates a global reference from a local reference returned into global_ref.
/// The returned reference must eventually be freed by calling FreeGlobalRef() (or have
/// the lifetime of the impalad process).
/// Catches Java exceptions and converts their message into status.
- static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global_ref);
+ static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref,
+ jobject* global_ref) WARN_UNUSED_RESULT;
/// Templated wrapper for jobject subclasses (e.g. jclass, jarray). This is necessary
/// because according to
@@ -224,15 +226,11 @@ class JniUtil {
/// to use a subclass like _jclass**. This is safe in this case because the returned
/// subclass is known to be correct.
template <typename jobject_subclass>
- static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref,
- jobject_subclass* global_ref) {
+ static Status LocalToGlobalRef(
+ JNIEnv* env, jobject local_ref, jobject_subclass* global_ref) {
return LocalToGlobalRef(env, local_ref, reinterpret_cast<jobject*>(global_ref));
}
- /// Deletes 'global_ref'. Catches Java exceptions and converts their message into
- /// status.
- static Status FreeGlobalRef(JNIEnv* env, jobject global_ref);
-
static jmethodID throwable_to_string_id() { return throwable_to_string_id_; }
static jmethodID throwable_to_stack_trace_id() { return throwable_to_stack_trace_id_; }
@@ -246,30 +244,31 @@ class JniUtil {
/// log_stack determines if the stack trace is written to the log
/// prefix, if non-empty will be prepended to the error message.
static Status GetJniExceptionMsg(JNIEnv* env, bool log_stack = true,
- const std::string& prefix = "");
+ const std::string& prefix = "") WARN_UNUSED_RESULT;
/// Populates 'result' with a list of memory metrics from the Jvm. Returns Status::OK
/// unless there is an exception.
static Status GetJvmMetrics(const TGetJvmMetricsRequest& request,
- TGetJvmMetricsResponse* result);
+ TGetJvmMetricsResponse* result) WARN_UNUSED_RESULT;
// Populates 'result' with information about live JVM threads. Returns
// Status::OK unless there is an exception.
static Status GetJvmThreadsInfo(const TGetJvmThreadsInfoRequest& request,
- TGetJvmThreadsInfoResponse* result);
+ TGetJvmThreadsInfoResponse* result) WARN_UNUSED_RESULT;
/// Loads a method whose signature is in the supplied descriptor. Returns Status::OK
/// and sets descriptor->method_id to a JNI method handle if successful, otherwise an
/// error status is returned.
static Status LoadJniMethod(JNIEnv* jni_env, const jclass& jni_class,
- JniMethodDescriptor* descriptor);
+ JniMethodDescriptor* descriptor) WARN_UNUSED_RESULT;
/// Same as LoadJniMethod(...), except that this loads a static method.
static Status LoadStaticJniMethod(JNIEnv* jni_env, const jclass& jni_class,
- JniMethodDescriptor* descriptor);
+ JniMethodDescriptor* descriptor) WARN_UNUSED_RESULT;
/// Utility methods to avoid repeating lots of the JNI call boilerplate.
- static Status CallJniMethod(const jobject& obj, const jmethodID& method) {
+ static Status CallJniMethod(
+ const jobject& obj, const jmethodID& method) WARN_UNUSED_RESULT {
JNIEnv* jni_env = getJNIEnv();
JniLocalFrame jni_frame;
RETURN_IF_ERROR(jni_frame.push(jni_env));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index ffc3d20..6f306f7 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -155,7 +155,7 @@ class JvmMetric : public IntGauge {
public:
/// Registers many Jvm memory metrics: one for every member of JvmMetricType for each
/// pool (usually ~5 pools plus a synthetic 'total' pool).
- static Status InitMetrics(MetricGroup* metrics);
+ static Status InitMetrics(MetricGroup* metrics) WARN_UNUSED_RESULT;
protected:
/// Searches through jvm_metrics_response_ for a matching memory pool and pulls out the
@@ -193,7 +193,7 @@ class JvmMetric : public IntGauge {
class BufferPoolMetric : public UIntGauge {
public:
static Status InitMetrics(MetricGroup* metrics, ReservationTracker* global_reservations,
- BufferPool* buffer_pool);
+ BufferPool* buffer_pool) WARN_UNUSED_RESULT;
/// Global metrics, initialized by CreateAndRegisterMetrics().
static BufferPoolMetric* LIMIT;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index 543e0e3..f25d5ad 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -217,7 +217,7 @@ TEST_F(MetricsTest, StatsMetricsSingle) {
TEST_F(MetricsTest, MemMetric) {
#ifndef ADDRESS_SANITIZER
MetricGroup metrics("MemMetrics");
- RegisterMemoryMetrics(&metrics, false, nullptr, nullptr);
+ ASSERT_OK(RegisterMemoryMetrics(&metrics, false, nullptr, nullptr));
// Smoke test to confirm that tcmalloc metrics are returning reasonable values.
UIntGauge* bytes_in_use =
metrics.FindMetricForTesting<UIntGauge>("tcmalloc.bytes-in-use");
@@ -249,7 +249,7 @@ TEST_F(MetricsTest, MemMetric) {
TEST_F(MetricsTest, JvmMetrics) {
MetricGroup metrics("JvmMetrics");
- RegisterMemoryMetrics(&metrics, true, nullptr, nullptr);
+ ASSERT_OK(RegisterMemoryMetrics(&metrics, true, nullptr, nullptr));
UIntGauge* jvm_total_used =
metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<UIntGauge>(
"jvm.total.current-usage-bytes");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/network-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 2615184..1783589 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -32,7 +32,7 @@ typedef std::string IpAddr;
/// 'address'. If the IP addresses of a host don't change, then subsequent calls will
/// always return the same address. Returns an error status if any system call failed,
/// otherwise OK. Even if OK is returned, addresses may still be of zero length.
-Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip);
+Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) WARN_UNUSED_RESULT;
/// Finds the first non-localhost IP address in the given list. Returns
/// true if such an address was found, false otherwise.
@@ -40,7 +40,7 @@ bool FindFirstNonLocalhost(const std::vector<std::string>& addresses, std::strin
/// Sets the output argument to the system defined hostname.
/// Returns OK if a hostname can be found, false otherwise.
-Status GetHostname(std::string* hostname);
+Status GetHostname(std::string* hostname) WARN_UNUSED_RESULT;
/// Utility method because Thrift does not supply useful constructors
TNetworkAddress MakeNetworkAddress(const std::string& hostname, int port);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index e21cff6..d5b0d01 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -136,6 +136,7 @@ class ParquetLevelReader : public impala::RleDecoder {
// were actually written if the final run is a literal run, only if the final run is
// a repeated run (see util/rle-encoding.h for more details).
// Returns the number of rows specified by the header.
+// Aborts the process if reading the file fails.
int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_t* page) {
const uint8_t* data = page;
std::vector<uint8_t> decompressed_buffer;
@@ -143,8 +144,8 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_
decompressed_buffer.resize(header.uncompressed_page_size);
boost::scoped_ptr<impala::Codec> decompressor;
- impala::Codec::CreateDecompressor(
- NULL, false, impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor);
+ ABORT_IF_ERROR(impala::Codec::CreateDecompressor(NULL, false,
+ impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor));
uint8_t* buffer_ptr = decompressed_buffer.data();
int uncompressed_page_size = header.uncompressed_page_size;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index ec31fc9..12f4e25 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -33,6 +33,7 @@
#include "util/periodic-counter-updater.h"
#include "util/pretty-printer.h"
#include "util/redactor.h"
+#include "util/scope-exit-trigger.h"
#include "common/names.h"
@@ -723,37 +724,39 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
}
}
-string RuntimeProfile::SerializeToArchiveString() const {
+Status RuntimeProfile::SerializeToArchiveString(string* out) const {
stringstream ss;
- SerializeToArchiveString(&ss);
- return ss.str();
+ RETURN_IF_ERROR(SerializeToArchiveString(&ss));
+ *out = ss.str();
+ return Status::OK();
}
-void RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
+Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
+ Status status;
TRuntimeProfileTree thrift_object;
const_cast<RuntimeProfile*>(this)->ToThrift(&thrift_object);
ThriftSerializer serializer(true);
vector<uint8_t> serialized_buffer;
- Status status = serializer.Serialize(&thrift_object, &serialized_buffer);
- if (!status.ok()) return;
+ RETURN_IF_ERROR(serializer.Serialize(&thrift_object, &serialized_buffer));
// Compress the serialized thrift string. This uses string keys and is very
// easy to compress.
scoped_ptr<Codec> compressor;
- status = Codec::CreateCompressor(NULL, false, THdfsCompression::DEFAULT, &compressor);
- DCHECK(status.ok()) << status.GetDetail();
- if (!status.ok()) return;
+ RETURN_IF_ERROR(
+ Codec::CreateCompressor(NULL, false, THdfsCompression::DEFAULT, &compressor));
+ const auto close_compressor =
+ MakeScopeExitTrigger([&compressor]() { compressor->Close(); });
vector<uint8_t> compressed_buffer;
compressed_buffer.resize(compressor->MaxOutputLen(serialized_buffer.size()));
int64_t result_len = compressed_buffer.size();
uint8_t* compressed_buffer_ptr = compressed_buffer.data();
- compressor->ProcessBlock(true, serialized_buffer.size(), serialized_buffer.data(),
- &result_len, &compressed_buffer_ptr);
+ RETURN_IF_ERROR(compressor->ProcessBlock(true, serialized_buffer.size(),
+ serialized_buffer.data(), &result_len, &compressed_buffer_ptr));
compressed_buffer.resize(result_len);
Base64Encode(compressed_buffer, out);
- compressor->Close();
+ return Status::OK();;
}
void RuntimeProfile::ToThrift(TRuntimeProfileTree* tree) const {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 244ab17..298c214 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -265,8 +265,8 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
/// object using thrift compact binary format, then gzip compresses it and
/// finally encodes it as base64. This is not a lightweight operation and
/// should not be in the hot path.
- std::string SerializeToArchiveString() const;
- void SerializeToArchiveString(std::stringstream* out) const;
+ Status SerializeToArchiveString(std::string* out) const WARN_UNUSED_RESULT;
+ Status SerializeToArchiveString(std::stringstream* out) const WARN_UNUSED_RESULT;
/// Divides all counters by n
void Divide(int n);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index cbc0031..800f690 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -52,7 +52,7 @@ class ThreadPool : public CacheLineAligned {
for (int i = 0; i < num_threads; ++i) {
std::stringstream threadname;
threadname << thread_prefix << "(" << i + 1 << ":" << num_threads << ")";
- threads_.AddThread(new Thread(group, threadname.str(),
+ threads_.AddThread(std::make_unique<Thread>(group, threadname.str(),
boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index c84ef0b..0e08ab1 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -34,7 +34,6 @@
#include "common/names.h"
namespace this_thread = boost::this_thread;
-using boost::ptr_vector;
using namespace rapidjson;
namespace impala {
@@ -331,13 +330,12 @@ void Thread::SuperviseThread(const string& name, const string& category,
thread_mgr_ref->RemoveThread(this_thread::get_id(), category_copy);
}
-Status ThreadGroup::AddThread(Thread* thread) {
- threads_.push_back(thread);
- return Status::OK();
+void ThreadGroup::AddThread(unique_ptr<Thread> thread) {
+ threads_.emplace_back(move(thread));
}
void ThreadGroup::JoinAll() {
- for (const Thread& thread: threads_) thread.Join();
+ for (auto& thread : threads_) thread->Join();
}
int ThreadGroup::Size() const {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index e21be7c..18f3a75 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -18,11 +18,13 @@
#ifndef IMPALA_UTIL_THREAD_H
#define IMPALA_UTIL_THREAD_H
+#include <memory>
+#include <vector>
+
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
#include "common/status.h"
#include "util/promise.h"
@@ -173,7 +175,7 @@ class ThreadGroup {
/// will destroy it when the ThreadGroup is destroyed. Threads will linger until that
/// point (even if terminated), however, so callers should be mindful of the cost of
/// placing very many threads in this set.
- Status AddThread(Thread* thread);
+ void AddThread(std::unique_ptr<Thread> thread);
/// Waits for all threads to finish. DO NOT call this from a thread inside this set;
/// deadlock will predictably ensue.
@@ -184,7 +186,7 @@ class ThreadGroup {
private:
/// All the threads grouped by this set.
- boost::ptr_vector<Thread> threads_;
+ std::vector<std::unique_ptr<Thread>> threads_;
};
/// Initialises the threading subsystem. Must be called before a Thread is created.