You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/09/21 04:00:40 UTC
[3/7] incubator-impala git commit: IMPALA-4160: Remove Llama support.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 5ee60d6..c8ebd5d 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -26,12 +26,10 @@
#include "common/global-types.h"
#include "common/status.h"
-#include "scheduling/query-resource-mgr.h"
#include "util/promise.h"
#include "util/runtime-profile.h"
#include "gen-cpp/Types_types.h" // for TNetworkAddress
#include "gen-cpp/Frontend_types.h"
-#include "gen-cpp/ResourceBrokerService_types.h"
namespace impala {
@@ -74,30 +72,19 @@ class QuerySchedule {
const TQueryOptions& query_options, RuntimeProfile* summary_profile,
RuntimeProfile::EventSequence* query_events);
- /// Returns OK if reservation_ contains a matching resource for each
- /// of the hosts in fragment_exec_params_. Returns an error otherwise.
- Status ValidateReservation();
-
const TUniqueId& query_id() const { return query_id_; }
const TQueryExecRequest& request() const { return request_; }
const TQueryOptions& query_options() const { return query_options_; }
const std::string& request_pool() const { return request_pool_; }
void set_request_pool(const std::string& pool_name) { request_pool_ = pool_name; }
- bool HasReservation() const { return !reservation_.allocated_resources.empty(); }
-
- /// Granted or timed out reservations need to be released. In both such cases,
- /// the reservation_'s reservation_id is set.
- bool NeedsRelease() const { return reservation_.__isset.reservation_id; }
- /// Gets the estimated memory (bytes) and vcores per-node. Returns the user specified
- /// estimate (MEM_LIMIT query parameter) if provided or the estimate from planning if
- /// available, but is capped at the amount of physical memory to avoid problems if
- /// either estimate is unreasonably large.
+ /// Gets the estimated memory (bytes) per-node. Returns the user specified estimate
+ /// (MEM_LIMIT query parameter) if provided or the estimate from planning if available,
+ /// but is capped at the amount of physical memory to avoid problems if either estimate
+ /// is unreasonably large.
int64_t GetPerHostMemoryEstimate() const;
- int16_t GetPerHostVCores() const;
/// Total estimated memory for all nodes. set_num_hosts() must be set before calling.
int64_t GetClusterMemoryEstimate() const;
- void GetResourceHostport(const TNetworkAddress& src, TNetworkAddress* dst);
/// Helper methods used by scheduler to populate this QuerySchedule.
void AddScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
@@ -116,10 +103,6 @@ class QuerySchedule {
const boost::unordered_set<TNetworkAddress>& unique_hosts() const {
return unique_hosts_;
}
- TResourceBrokerReservationResponse* reservation() { return &reservation_; }
- const TResourceBrokerReservationRequest& reservation_request() const {
- return reservation_request_;
- }
bool is_admitted() const { return is_admitted_; }
void set_is_admitted(bool is_admitted) { is_admitted_ = is_admitted; }
RuntimeProfile* summary_profile() { return summary_profile_; }
@@ -127,10 +110,6 @@ class QuerySchedule {
void SetUniqueHosts(const boost::unordered_set<TNetworkAddress>& unique_hosts);
- /// Populates reservation_request_ ready to submit a query to Llama for all initial
- /// resources required for this query.
- void PrepareReservationRequest(const std::string& pool, const std::string& user);
-
private:
/// These references are valid for the lifetime of this query schedule because they
@@ -165,18 +144,9 @@ class QuerySchedule {
/// Request pool to which the request was submitted for admission.
std::string request_pool_;
- /// Reservation request to be submitted to Llama. Set in PrepareReservationRequest().
- TResourceBrokerReservationRequest reservation_request_;
-
- /// Fulfilled reservation request. Populated by scheduler.
- TResourceBrokerReservationResponse reservation_;
-
/// Indicates if the query has been admitted for execution.
bool is_admitted_;
- /// Resolves unique_hosts_ to node mgr addresses. Valid only after SetUniqueHosts() has
- /// been called.
- boost::scoped_ptr<ResourceResolver> resource_resolver_;
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/request-pool-service.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc
index 9f3363d..ea2553e 100644
--- a/be/src/scheduling/request-pool-service.cc
+++ b/be/src/scheduling/request-pool-service.cc
@@ -44,6 +44,7 @@ static const string DEFAULT_USER = "default";
DEFINE_string(fair_scheduler_allocation_path, "", "Path to the fair scheduler "
"allocation file (fair-scheduler.xml).");
+// TODO: Rename / cleanup now that Llama is removed (see IMPALA-4159).
DEFINE_string(llama_site_path, "", "Path to the Llama configuration file "
"(llama-site.xml). If set, fair_scheduler_allocation_path must also be set.");
@@ -74,7 +75,6 @@ DEFINE_bool(disable_pool_mem_limits, false, "Disables all per-pool mem limits.")
DEFINE_bool(disable_pool_max_requests, false, "Disables all per-pool limits on the "
"maximum number of running requests.");
-DECLARE_bool(enable_rm);
// Pool name used when the configuration files are not specified.
static const string DEFAULT_POOL_NAME = "default-pool";
@@ -94,12 +94,7 @@ RequestPoolService::RequestPoolService(MetricGroup* metrics) :
resolve_pool_ms_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, RESOLVE_POOL_METRIC_NAME);
- if (FLAGS_fair_scheduler_allocation_path.empty() &&
- FLAGS_llama_site_path.empty()) {
- if (FLAGS_enable_rm) {
- CLEAN_EXIT_WITH_ERROR("If resource management is enabled, "
- "-fair_scheduler_allocation_path is required.");
- }
+ if (FLAGS_fair_scheduler_allocation_path.empty()) {
default_pool_only_ = true;
bool is_percent; // not used
int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_default_pool_mem_limit,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 8b074d2..4c3a967 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -31,7 +31,6 @@
#include "gen-cpp/PlanNodes_types.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/ResourceBrokerService_types.h"
namespace impala {
@@ -57,21 +56,6 @@ class Scheduler {
/// Releases the reserved resources (if any) from the given schedule.
virtual Status Release(QuerySchedule* schedule) = 0;
- /// Notifies this scheduler that a resource reservation has been preempted by the
- /// central scheduler (Yarn via Llama). All affected queries are cancelled
- /// via their coordinator.
- virtual void HandlePreemptedReservation(const TUniqueId& reservation_id) = 0;
-
- /// Notifies this scheduler that a single resource with the given client resource id
- /// has been preempted by the central scheduler (Yarn via Llama). All affected queries
- /// are cancelled via their coordinator.
- virtual void HandlePreemptedResource(const TUniqueId& client_resource_id) = 0;
-
- /// Notifies this scheduler that a single resource with the given client resource id
- /// has been lost by the central scheduler (Yarn via Llama). All affected queries
- /// are cancelled via their coordinator.
- virtual void HandleLostResource(const TUniqueId& client_resource_id) = 0;
-
/// Initialises the scheduler, acquiring all resources needed to make
/// scheduling decisions once this method returns.
virtual Status Init() = 0;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/simple-scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test.cc b/be/src/scheduling/simple-scheduler-test.cc
index 76da6f9..7116e21 100644
--- a/be/src/scheduling/simple-scheduler-test.cc
+++ b/be/src/scheduling/simple-scheduler-test.cc
@@ -830,8 +830,8 @@ class SchedulerWrapper {
scheduler_backend_address.hostname = scheduler_host.ip;
scheduler_backend_address.port = scheduler_host.be_port;
- scheduler_.reset(new SimpleScheduler(NULL, scheduler_backend_id,
- scheduler_backend_address, &metrics_, NULL, NULL, NULL));
+ scheduler_.reset(new SimpleScheduler(
+ NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL));
scheduler_->Init();
// Initialize the scheduler backend maps.
SendFullMembershipMap();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index f4b90cc..f3ba9a5 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -29,7 +29,6 @@
#include "common/logging.h"
#include "util/metrics.h"
-#include "resourcebroker/resource-broker.h"
#include "runtime/exec-env.h"
#include "runtime/coordinator.h"
#include "service/impala-server.h"
@@ -43,11 +42,9 @@
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/error-util.h"
-#include "util/llama-util.h"
#include "util/mem-info.h"
#include "util/parse-util.h"
#include "util/runtime-profile-counters.h"
-#include "gen-cpp/ResourceBrokerService_types.h"
#include "common/names.h"
@@ -58,9 +55,6 @@ using namespace strings;
DECLARE_int32(be_port);
DECLARE_string(hostname);
-DECLARE_bool(enable_rm);
-DECLARE_int32(rm_default_cpu_vcores);
-DECLARE_string(rm_default_memory);
DEFINE_bool(disable_admission_control, false, "Disables admission control.");
@@ -79,8 +73,7 @@ const string SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
const string& backend_id, const TNetworkAddress& backend_address,
- MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
- RequestPoolService* request_pool_service)
+ MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service)
: backend_config_(std::make_shared<const BackendConfig>()),
metrics_(metrics->GetOrCreateChildGroup("scheduler")),
webserver_(webserver),
@@ -90,7 +83,6 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
total_assignments_(NULL),
total_local_assignments_(NULL),
initialized_(NULL),
- resource_broker_(resource_broker),
request_pool_service_(request_pool_service) {
local_backend_descriptor_.address = backend_address;
@@ -99,32 +91,10 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
admission_controller_.reset(
new AdmissionController(request_pool_service_, metrics, backend_address));
}
-
- if (FLAGS_enable_rm) {
- if (FLAGS_rm_default_cpu_vcores <= 0) {
- LOG(ERROR) << "Bad value for --rm_default_cpu_vcores (must be postive): "
- << FLAGS_rm_default_cpu_vcores;
- exit(1);
- }
- bool is_percent;
- int64_t mem_bytes =
- ParseUtil::ParseMemSpec(
- FLAGS_rm_default_memory, &is_percent, MemInfo::physical_mem());
- if (mem_bytes <= 1024 * 1024) {
- LOG(ERROR) << "Bad value for --rm_default_memory (must be larger than 1M):"
- << FLAGS_rm_default_memory;
- exit(1);
- } else if (is_percent) {
- LOG(ERROR) << "Must use absolute value for --rm_default_memory: "
- << FLAGS_rm_default_memory;
- exit(1);
- }
- }
}
SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends,
- MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
- RequestPoolService* request_pool_service)
+ MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service)
: backend_config_(std::make_shared<const BackendConfig>(backends)),
metrics_(metrics),
webserver_(webserver),
@@ -133,7 +103,6 @@ SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends,
total_assignments_(NULL),
total_local_assignments_(NULL),
initialized_(NULL),
- resource_broker_(resource_broker),
request_pool_service_(request_pool_service) {
DCHECK(backends.size() > 0);
local_backend_descriptor_.address = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
@@ -289,10 +258,7 @@ void SimpleScheduler::UpdateMembership(
// If this impalad is not in our view of the membership list, we should add it and
// tell the statestore.
- bool is_offline = ExecEnv::GetInstance() &&
- ExecEnv::GetInstance()->impala_server()->IsOffline();
- if (!is_offline &&
- current_membership_.find(local_backend_id_) == current_membership_.end()) {
+ if (current_membership_.find(local_backend_id_) == current_membership_.end()) {
VLOG(1) << "Registering local backend with statestore";
subscriber_topic_updates->push_back(TTopicDelta());
TTopicDelta& update = subscriber_topic_updates->back();
@@ -308,13 +274,6 @@ void SimpleScheduler::UpdateMembership(
<< " " << status.GetDetail();
subscriber_topic_updates->pop_back();
}
- } else if (is_offline &&
- current_membership_.find(local_backend_id_) != current_membership_.end()) {
- LOG(WARNING) << "Removing offline ImpalaServer from statestore";
- subscriber_topic_updates->push_back(TTopicDelta());
- TTopicDelta& update = subscriber_topic_updates->back();
- update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
- update.topic_deletions.push_back(local_backend_id_);
}
if (metrics_ != NULL) {
num_fragment_instances_metric_->set_value(current_membership_.size());
@@ -626,7 +585,6 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
for (const FragmentExecParams& exec_params: *fragment_exec_params) {
unique_hosts.insert(exec_params.hosts.begin(), exec_params.hosts.end());
}
-
schedule->SetUniqueHosts(unique_hosts);
}
@@ -723,38 +681,12 @@ Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
schedule->set_request_pool(resolved_pool);
schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool);
- if (ExecEnv::GetInstance()->impala_server()->IsOffline()) {
- return Status("This Impala server is offline. Please retry your query later.");
- }
-
RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
ComputeFragmentHosts(schedule->request(), schedule);
ComputeFragmentExecParams(schedule->request(), schedule);
if (!FLAGS_disable_admission_control) {
RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
}
- if (!FLAGS_enable_rm) return Status::OK();
- string user = GetEffectiveUser(schedule->request().query_ctx.session);
- if (user.empty()) user = "default";
- schedule->PrepareReservationRequest(resolved_pool, user);
- const TResourceBrokerReservationRequest& reservation_request =
- schedule->reservation_request();
- if (!reservation_request.resources.empty()) {
- Status status = resource_broker_->Reserve(
- reservation_request, schedule->reservation());
- if (!status.ok()) {
- // Warn about missing table and/or column stats if necessary.
- const TQueryCtx& query_ctx = schedule->request().query_ctx;
- if (!query_ctx.__isset.parent_query_id &&
- query_ctx.__isset.tables_missing_stats &&
- !query_ctx.tables_missing_stats.empty()) {
- status.AddDetail(GetTablesMissingStatsWarning(query_ctx.tables_missing_stats));
- }
- return status;
- }
- RETURN_IF_ERROR(schedule->ValidateReservation());
- AddToActiveResourceMaps(*schedule->reservation(), coord);
- }
return Status::OK();
}
@@ -762,106 +694,9 @@ Status SimpleScheduler::Release(QuerySchedule* schedule) {
if (!FLAGS_disable_admission_control) {
RETURN_IF_ERROR(admission_controller_->ReleaseQuery(schedule));
}
- if (FLAGS_enable_rm && schedule->NeedsRelease()) {
- DCHECK(resource_broker_ != NULL);
- Status status = resource_broker_->ReleaseReservation(
- schedule->reservation()->reservation_id);
- // Remove the reservation from the active-resource maps even if there was an error
- // releasing the reservation because the query running in the reservation is done.
- RemoveFromActiveResourceMaps(*schedule->reservation());
- RETURN_IF_ERROR(status);
- }
return Status::OK();
}
-void SimpleScheduler::AddToActiveResourceMaps(
- const TResourceBrokerReservationResponse& reservation, Coordinator* coord) {
- lock_guard<mutex> l(active_resources_lock_);
- active_reservations_[reservation.reservation_id] = coord;
- map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
- for (iter = reservation.allocated_resources.begin();
- iter != reservation.allocated_resources.end();
- ++iter) {
- TUniqueId client_resource_id;
- client_resource_id << iter->second.client_resource_id;
- active_client_resources_[client_resource_id] = coord;
- }
-}
-
-void SimpleScheduler::RemoveFromActiveResourceMaps(
- const TResourceBrokerReservationResponse& reservation) {
- lock_guard<mutex> l(active_resources_lock_);
- active_reservations_.erase(reservation.reservation_id);
- map<TNetworkAddress, llama::TAllocatedResource>::const_iterator iter;
- for (iter = reservation.allocated_resources.begin();
- iter != reservation.allocated_resources.end();
- ++iter) {
- TUniqueId client_resource_id;
- client_resource_id << iter->second.client_resource_id;
- active_client_resources_.erase(client_resource_id);
- }
-}
-
-// TODO: Refactor the Handle*{Reservation,Resource} functions to avoid code duplication.
-void SimpleScheduler::HandlePreemptedReservation(const TUniqueId& reservation_id) {
- VLOG_QUERY << "HandlePreemptedReservation client_id=" << reservation_id;
- Coordinator* coord = NULL;
- {
- lock_guard<mutex> l(active_resources_lock_);
- ActiveReservationsMap::iterator it = active_reservations_.find(reservation_id);
- if (it != active_reservations_.end()) coord = it->second;
- }
- if (coord == NULL) {
- LOG(WARNING) << "Ignoring preempted reservation id " << reservation_id
- << " because no active query using it was found.";
- } else {
- stringstream err_msg;
- err_msg << "Reservation " << reservation_id << " was preempted";
- Status status(err_msg.str());
- coord->Cancel(&status);
- }
-}
-
-void SimpleScheduler::HandlePreemptedResource(const TUniqueId& client_resource_id) {
- VLOG_QUERY << "HandlePreemptedResource client_id=" << client_resource_id;
- Coordinator* coord = NULL;
- {
- lock_guard<mutex> l(active_resources_lock_);
- ActiveClientResourcesMap::iterator it =
- active_client_resources_.find(client_resource_id);
- if (it != active_client_resources_.end()) coord = it->second;
- }
- if (coord == NULL) {
- LOG(WARNING) << "Ignoring preempted client resource id " << client_resource_id
- << " because no active query using it was found.";
- } else {
- stringstream err_msg;
- err_msg << "Resource " << client_resource_id << " was preempted";
- Status status(err_msg.str());
- coord->Cancel(&status);
- }
-}
-
-void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id) {
- VLOG_QUERY << "HandleLostResource preempting client_id=" << client_resource_id;
- Coordinator* coord = NULL;
- {
- lock_guard<mutex> l(active_resources_lock_);
- ActiveClientResourcesMap::iterator it =
- active_client_resources_.find(client_resource_id);
- if (it != active_client_resources_.end()) coord = it->second;
- }
- if (coord == NULL) {
- LOG(WARNING) << "Ignoring lost client resource id " << client_resource_id
- << " because no active query using it was found.";
- } else {
- stringstream err_msg;
- err_msg << "Resource " << client_resource_id << " was lost";
- Status status(err_msg.str());
- coord->Cancel(&status);
- }
-}
-
SimpleScheduler::AssignmentCtx::AssignmentCtx(
const BackendConfig& backend_config,
IntCounter* total_assignments, IntCounter* total_local_assignments)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index dd119c2..8c96dde 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -36,12 +36,10 @@
#include "scheduling/admission-controller.h"
#include "scheduling/backend-config.h"
#include "gen-cpp/Types_types.h" // for TNetworkAddress
-#include "gen-cpp/ResourceBrokerService_types.h"
#include "rapidjson/rapidjson.h"
namespace impala {
-class ResourceBroker;
class Coordinator;
class SchedulerWrapper;
@@ -78,22 +76,18 @@ class SimpleScheduler : public Scheduler {
/// - backend_address - the address that this backend listens on
SimpleScheduler(StatestoreSubscriber* subscriber, const std::string& backend_id,
const TNetworkAddress& backend_address, MetricGroup* metrics, Webserver* webserver,
- ResourceBroker* resource_broker, RequestPoolService* request_pool_service);
+ RequestPoolService* request_pool_service);
/// Initialize with a list of <host:port> pairs in 'static' mode - i.e. the set of
/// backends is fixed and will not be updated.
SimpleScheduler(const std::vector<TNetworkAddress>& backends, MetricGroup* metrics,
- Webserver* webserver, ResourceBroker* resource_broker,
- RequestPoolService* request_pool_service);
+ Webserver* webserver, RequestPoolService* request_pool_service);
/// Register with the subscription manager if required
virtual impala::Status Init();
virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule);
virtual Status Release(QuerySchedule* schedule);
- virtual void HandlePreemptedReservation(const TUniqueId& reservation_id);
- virtual void HandlePreemptedResource(const TUniqueId& client_resource_id);
- virtual void HandleLostResource(const TUniqueId& client_resource_id);
private:
/// Map from a host's IP address to the next backend to be round-robin scheduled for
@@ -306,27 +300,6 @@ class SimpleScheduler : public Scheduler {
/// Current number of backends
IntGauge* num_fragment_instances_metric_;
- /// Protect active_reservations_ and active_client_resources_.
- boost::mutex active_resources_lock_;
-
- /// Map from a Llama reservation id to the coordinator of the query using that
- /// reservation. The map is used to cancel queries whose reservation has been preempted.
- /// Entries are added in Schedule() calls that result in granted resource allocations.
- /// Entries are removed in Release().
- typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveReservationsMap;
- ActiveReservationsMap active_reservations_;
-
- /// Map from client resource id to the coordinator of the query using that resource.
- /// The map is used to cancel queries whose resource(s) have been preempted.
- /// Entries are added in Schedule() calls that result in granted resource allocations.
- /// Entries are removed in Release().
- typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveClientResourcesMap;
- ActiveClientResourcesMap active_client_resources_;
-
- /// Resource broker that mediates resource requests between Impala and the Llama.
- /// Set to NULL if resource management is disabled.
- ResourceBroker* resource_broker_;
-
/// Used for user-to-pool resolution and looking up pool configurations. Not owned by
/// us.
RequestPoolService* request_pool_service_;
@@ -339,16 +312,6 @@ class SimpleScheduler : public Scheduler {
BackendConfigPtr GetBackendConfig() const;
void SetBackendConfig(const BackendConfigPtr& backend_config);
- /// Add the granted reservation and resources to the active_reservations_ and
- /// active_client_resources_ maps, respectively.
- void AddToActiveResourceMaps(
- const TResourceBrokerReservationResponse& reservation, Coordinator* coord);
-
- /// Remove the given reservation and resources from the active_reservations_ and
- /// active_client_resources_ maps, respectively.
- void RemoveFromActiveResourceMaps(
- const TResourceBrokerReservationResponse& reservation);
-
/// Called asynchronously when an update is received from the subscription manager
void UpdateMembership(const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
std::vector<TTopicDelta>* subscriber_topic_updates);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c749a6a..1b10aec 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -60,7 +60,6 @@
#include "service/query-exec-state.h"
#include "scheduling/simple-scheduler.h"
#include "util/bit-util.h"
-#include "util/cgroups-mgr.h"
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/error-util.h"
@@ -177,9 +176,9 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i
"QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
" the maximum allowable timeout.");
-DEFINE_string(local_nodemanager_url, "", "The URL of the local Yarn Node Manager's HTTP "
- "interface, used to detect if the Node Manager fails");
-DECLARE_bool(enable_rm);
+// TODO: Remove for Impala 3.0.
+DEFINE_string(local_nodemanager_url, "", "Deprecated");
+
DECLARE_bool(compact_catalog_topic);
namespace impala {
@@ -361,12 +360,6 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
query_expiration_thread_.reset(new Thread("impala-server", "query-expirer",
bind<void>(&ImpalaServer::ExpireQueries, this)));
- is_offline_ = false;
- if (FLAGS_enable_rm) {
- nm_failure_detection_thread_.reset(new Thread("impala-server", "nm-failure-detector",
- bind<void>(&ImpalaServer::DetectNmFailures, this)));
- }
-
exec_env_->SetImpalaServer(this);
}
@@ -783,9 +776,7 @@ Status ImpalaServer::ExecuteInternal(
shared_ptr<QueryExecState>* exec_state) {
DCHECK(session_state != NULL);
*registered_exec_state = false;
- if (IsOffline()) {
- return Status("This Impala server is offline. Please retry your query later.");
- }
+
exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
this, session_state));
@@ -1938,81 +1929,6 @@ shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState(
}
}
-void ImpalaServer::SetOffline(bool is_offline) {
- lock_guard<mutex> l(is_offline_lock_);
- is_offline_ = is_offline;
- ImpaladMetrics::IMPALA_SERVER_READY->set_value(is_offline);
-}
-
-void ImpalaServer::DetectNmFailures() {
- DCHECK(FLAGS_enable_rm);
- if (FLAGS_local_nodemanager_url.empty()) {
- LOG(WARNING) << "No NM address set (--nm_addr is empty), no NM failure detection "
- << "thread started";
- return;
- }
- // We only want a network address to open a socket to, for now. Get rid of http(s)://
- // prefix, and split the string into hostname:port.
- if (istarts_with(FLAGS_local_nodemanager_url, "http://")) {
- FLAGS_local_nodemanager_url =
- FLAGS_local_nodemanager_url.substr(string("http://").size());
- } else if (istarts_with(FLAGS_local_nodemanager_url, "https://")) {
- FLAGS_local_nodemanager_url =
- FLAGS_local_nodemanager_url.substr(string("https://").size());
- }
- vector<string> components;
- split(components, FLAGS_local_nodemanager_url, is_any_of(":"));
- if (components.size() < 2) {
- LOG(ERROR) << "Could not parse network address from --local_nodemanager_url, no NM"
- << " failure detection thread started";
- return;
- }
- DCHECK_GE(components.size(), 2);
- TNetworkAddress nm_addr =
- MakeNetworkAddress(components[0], atoi(components[1].c_str()));
-
- MissedHeartbeatFailureDetector failure_detector(MAX_NM_MISSED_HEARTBEATS,
- MAX_NM_MISSED_HEARTBEATS / 2);
- struct addrinfo* addr;
- if (getaddrinfo(nm_addr.hostname.c_str(), components[1].c_str(), NULL, &addr)) {
- LOG(WARNING) << "Could not resolve NM address: " << nm_addr << ". Error was: "
- << GetStrErrMsg();
- return;
- }
- LOG(INFO) << "Starting NM failure-detection thread, NM at: " << nm_addr;
- // True if the last time through the loop Impala had failed, otherwise false. Used to
- // only change the offline status when there's a change in state.
- bool last_failure_state = false;
- while (true) {
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- if (sockfd >= 0) {
- if (connect(sockfd, addr->ai_addr, sizeof(sockaddr)) < 0) {
- failure_detector.UpdateHeartbeat(FLAGS_local_nodemanager_url, false);
- } else {
- failure_detector.UpdateHeartbeat(FLAGS_local_nodemanager_url, true);
- }
- ::close(sockfd);
- } else {
- LOG(ERROR) << "Could not create socket! Error was: " << GetStrErrMsg();
- }
- bool is_failed = (failure_detector.GetPeerState(FLAGS_local_nodemanager_url) ==
- FailureDetector::FAILED);
- if (is_failed != last_failure_state) {
- if (is_failed) {
- LOG(WARNING) <<
- "ImpalaServer is going offline while local node-manager connectivity is bad";
- } else {
- LOG(WARNING) <<
- "Node-manager connectivity has been restored. ImpalaServer is now online";
- }
- SetOffline(is_failed);
- }
- last_failure_state = is_failed;
- SleepForMs(2000);
- }
- freeaddrinfo(addr);
-}
-
void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
const TUpdateFilterParams& params) {
shared_ptr<QueryExecState> query_exec_state = GetQueryExecState(params.query_id, false);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index f756932..2104c5e 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -240,12 +240,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
void CatalogUpdateCallback(const StatestoreSubscriber::TopicDeltaMap& topic_deltas,
std::vector<TTopicDelta>* topic_updates);
- /// Returns true if Impala is offline (and not accepting queries), false otherwise.
- bool IsOffline() {
- boost::lock_guard<boost::mutex> l(is_offline_lock_);
- return is_offline_;
- }
-
/// Returns true if lineage logging is enabled, false otherwise.
bool IsLineageLoggingEnabled();
@@ -633,15 +627,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
/// FLAGS_idle_query_timeout seconds.
void ExpireQueries();
- /// Periodically opens a socket to FLAGS_local_nodemanager_url to check if the Yarn Node
- /// Manager is running. If not, this method calls SetOffline(true), and when the NM
- /// recovers, calls SetOffline(false). Only called (in nm_failure_detection_thread_) if
- /// FLAGS_enable_rm is true.
- void DetectNmFailures();
-
- /// Set is_offline_ to the argument's value.
- void SetOffline(bool offline);
-
/// Guards query_log_ and query_log_index_
boost::mutex query_log_lock_;
@@ -963,15 +948,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
/// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
boost::scoped_ptr<Thread> query_expiration_thread_;
-
- /// Container thread for DetectNmFailures().
- boost::scoped_ptr<Thread> nm_failure_detection_thread_;
-
- /// Protects is_offline_
- boost::mutex is_offline_lock_;
-
- /// True if Impala server is offline, false otherwise.
- bool is_offline_;
};
/// Create an ImpalaServer and Thrift servers.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index de1a56b..55d9ad6 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -54,6 +54,7 @@ DECLARE_int32(beeswax_port);
DECLARE_int32(hs2_port);
DECLARE_int32(be_port);
DECLARE_string(principal);
+DECLARE_bool(enable_rm);
int ImpaladMain(int argc, char** argv) {
InitCommonRuntime(argc, argv, true);
@@ -66,6 +67,13 @@ int ImpaladMain(int argc, char** argv) {
ABORT_IF_ERROR(HiveUdfCall::Init());
InitFeSupport();
+ if (FLAGS_enable_rm) {
+ // TODO: Remove in Impala 3.0.
+ LOG(WARNING) << "*****************************************************************";
+ LOG(WARNING) << "Llama support has been deprecated. FLAGS_enable_rm has no effect.";
+ LOG(WARNING) << "*****************************************************************";
+ }
+
// start backend service for the coordinator on be_port
ExecEnv exec_env;
StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index cea24bb..eb710fb 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -21,7 +21,6 @@
#include "exprs/expr.h"
#include "exprs/expr-context.h"
-#include "resourcebroker/resource-broker.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -48,7 +47,6 @@ using namespace strings;
DECLARE_int32(catalog_service_port);
DECLARE_string(catalog_service_host);
-DECLARE_bool(enable_rm);
DECLARE_int64(max_result_cache_size);
namespace impala {
@@ -428,34 +426,16 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
- if (FLAGS_enable_rm) {
- DCHECK(exec_env_->resource_broker() != NULL);
- }
schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
exec_request_.query_options, &summary_profile_, query_events_));
coord_.reset(new Coordinator(exec_request_.query_options, exec_env_, query_events_));
Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
- if (FLAGS_enable_rm) {
- if (status.ok()) {
- stringstream reservation_request_ss;
- reservation_request_ss << schedule_->reservation_request();
- summary_profile_.AddInfoString("Resource reservation request",
- reservation_request_ss.str());
- }
- }
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(UpdateQueryStatus(status));
}
- if (FLAGS_enable_rm && schedule_->HasReservation()) {
- // Add the granted reservation to the query profile.
- stringstream reservation_ss;
- reservation_ss << *schedule_->reservation();
- summary_profile_.AddInfoString("Granted resource reservation", reservation_ss.str());
- query_events_->MarkEvent("Resources reserved");
- }
status = coord_->Exec(*schedule_, &output_expr_ctxs_);
{
lock_guard<mutex> l(lock_);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index da682f4..62fff80 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -36,7 +36,6 @@ add_library(Util
bitmap.cc
bit-util.cc
bloom-filter.cc
- cgroups-mgr.cc
coding-util.cc
codec.cc
compress.cc
@@ -54,7 +53,6 @@ add_library(Util
hdr-histogram.cc
impalad-metrics.cc
jni-util.cc
- llama-util.cc
logging-support.cc
mem-info.cc
memory-metrics.cc
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/cgroups-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/util/cgroups-mgr.cc b/be/src/util/cgroups-mgr.cc
deleted file mode 100644
index e49d57c..0000000
--- a/be/src/util/cgroups-mgr.cc
+++ /dev/null
@@ -1,238 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/cgroups-mgr.h"
-
-#include <fstream>
-#include <sstream>
-#include <boost/filesystem.hpp>
-#include "util/debug-util.h"
-#include <gutil/strings/substitute.h>
-
-#include "common/names.h"
-
-using boost::filesystem::create_directory;
-using boost::filesystem::exists;
-using boost::filesystem::remove;
-using namespace impala;
-using namespace strings;
-
-namespace impala {
-
-// Suffix appended to Yarn resource ids to form an Impala-internal cgroups.
-const std::string IMPALA_CGROUP_SUFFIX = "_impala";
-
-// Yarn's default multiplier for translating virtual CPU cores into cgroup CPU shares.
-// See Yarn's CgroupsLCEResourcesHandler.java for more details.
-const int32_t CPU_DEFAULT_WEIGHT = 1024;
-
-CgroupsMgr::CgroupsMgr(MetricGroup* metrics) {
- active_cgroups_metric_ = metrics->AddGauge<int64_t>("cgroups-mgr.active-cgroups", 0);
-}
-
-Status CgroupsMgr::Init(const string& cgroups_hierarchy_path,
- const string& staging_cgroup) {
- cgroups_hierarchy_path_ = cgroups_hierarchy_path;
- staging_cgroup_ = staging_cgroup;
- // Set up the staging cgroup for Impala to retire execution threads into.
- RETURN_IF_ERROR(CreateCgroup(staging_cgroup, true));
- return Status::OK();
-}
-
-string CgroupsMgr::UniqueIdToCgroup(const string& unique_id) const {
- if (unique_id.empty()) return "";
- return unique_id + IMPALA_CGROUP_SUFFIX;
-}
-
-int32_t CgroupsMgr::VirtualCoresToCpuShares(int16_t v_cpu_cores) {
- if (v_cpu_cores <= 0) return -1;
- return CPU_DEFAULT_WEIGHT * v_cpu_cores;
-}
-
-Status CgroupsMgr::CreateCgroup(const string& cgroup, bool if_not_exists) const {
- const string& cgroup_path = Substitute("$0/$1", cgroups_hierarchy_path_, cgroup);
- try {
- // Returns false if the dir already exists, otherwise throws an exception.
- if (!create_directory(cgroup_path) && !if_not_exists) {
- stringstream err_msg;
- err_msg << "Failed to create CGroup at path " << cgroup_path
- << ". Path already exists.";
- return Status(err_msg.str());
- }
- LOG(INFO) << "Created CGroup " << cgroup_path;
- } catch (std::exception& e) {
- stringstream err_msg;
- err_msg << "Failed to create CGroup at path " << cgroup_path << ". " << e.what();
- return Status(err_msg.str());
- }
- return Status::OK();
-}
-
-Status CgroupsMgr::DropCgroup(const string& cgroup, bool if_exists) const {
- const string& cgroup_path = Substitute("$0/$1", cgroups_hierarchy_path_, cgroup);
- LOG(INFO) << "Dropping CGroup " << cgroups_hierarchy_path_ << " " << cgroup;
- try {
- if(!remove(cgroup_path) && !if_exists) {
- stringstream err_msg;
- err_msg << "Failed to create CGroup at path " << cgroup_path
- << ". Path does not exist.";
- return Status(err_msg.str());
- }
- } catch (std::exception& e) {
- stringstream err_msg;
- err_msg << "Failed to drop CGroup at path " << cgroup_path << ". " << e.what();
- return Status(err_msg.str());
- }
- return Status::OK();
-}
-
-Status CgroupsMgr::SetCpuShares(const string& cgroup, int32_t num_shares) {
- string cgroup_path;
- string tasks_path;
- RETURN_IF_ERROR(GetCgroupPaths(cgroup, &cgroup_path, &tasks_path));
-
- const string& cpu_shares_path = Substitute("$0/$1", cgroup_path, "cpu.shares");
- ofstream cpu_shares(tasks_path.c_str(), ios::out | ios::trunc);
- if (!cpu_shares.is_open()) {
- stringstream err_msg;
- err_msg << "CGroup CPU shares file: " << cpu_shares_path
- << " is not writable by Impala";
- return Status(err_msg.str());
- }
-
- LOG(INFO) << "Setting CPU shares of CGroup " << cgroup_path << " to " << num_shares;
- cpu_shares << num_shares << endl;
- return Status::OK();
-}
-
-Status CgroupsMgr::GetCgroupPaths(const std::string& cgroup,
- std::string* cgroup_path, std::string* tasks_path) const {
- stringstream cgroup_path_ss;
- cgroup_path_ss << cgroups_hierarchy_path_ << "/" << cgroup;
- *cgroup_path = cgroup_path_ss.str();
- if (!exists(*cgroup_path)) {
- stringstream err_msg;
- err_msg << "CGroup " << *cgroup_path << " does not exist";
- return Status(err_msg.str());
- }
-
- stringstream tasks_path_ss;
- tasks_path_ss << *cgroup_path << "/tasks";
- *tasks_path = tasks_path_ss.str();
- if (!exists(*tasks_path)) {
- stringstream err_msg;
- err_msg << "CGroup " << *cgroup_path << " does not have a /tasks file";
- return Status(err_msg.str());
- }
- return Status::OK();
-}
-
-Status CgroupsMgr::AssignThreadToCgroup(const Thread& thread,
- const string& cgroup) const {
- string cgroup_path;
- string tasks_path;
- RETURN_IF_ERROR(GetCgroupPaths(cgroup, &cgroup_path, &tasks_path));
-
- ofstream tasks(tasks_path.c_str(), ios::out | ios::app);
- if (!tasks.is_open()) {
- stringstream err_msg;
- err_msg << "CGroup tasks file: " << tasks_path << " is not writable by Impala";
- return Status(err_msg.str());
- }
- tasks << thread.tid() << endl;
-
- VLOG_ROW << "Thread " << thread.tid() << " moved to CGroup " << cgroup_path;
- tasks.close();
- return Status::OK();
-}
-
-Status CgroupsMgr::RelocateThreads(const string& src_cgroup,
- const string& dst_cgroup) const {
- string src_cgroup_path;
- string src_tasks_path;
- RETURN_IF_ERROR(GetCgroupPaths(src_cgroup, &src_cgroup_path, &src_tasks_path));
-
- string dst_cgroup_path;
- string dst_tasks_path;
- RETURN_IF_ERROR(GetCgroupPaths(dst_cgroup, &dst_cgroup_path, &dst_tasks_path));
-
- ifstream src_tasks(src_tasks_path.c_str());
- if (!src_tasks) {
- stringstream err_msg;
- err_msg << "Failed to open source CGroup tasks file at: " << src_tasks_path;
- return Status(err_msg.str());
- }
-
- ofstream dst_tasks(dst_tasks_path.c_str(), ios::out | ios::app);
- if (!dst_tasks) {
- stringstream err_msg;
- err_msg << "Failed to open destination CGroup tasks file at: " << dst_tasks_path;
- return Status(err_msg.str());
- }
-
- int32_t tid;
- while (src_tasks >> tid) {
- dst_tasks << tid << endl;
- // Attempting to write a non-existent tid/pid will result in an error,
- // so clear the error flags after every append.
- dst_tasks.clear();
- VLOG_ROW << "Relocating thread id " << tid << " from " << src_tasks_path
- << " to " << dst_tasks_path;
- }
-
- return Status::OK();
-}
-
-Status CgroupsMgr::RegisterFragment(const TUniqueId& fragment_instance_id,
- const string& cgroup, bool* is_first) {
- if (cgroup.empty() || cgroups_hierarchy_path_.empty()) return Status::OK();
-
- LOG(INFO) << "Registering fragment " << PrintId(fragment_instance_id)
- << " with CGroup " << cgroups_hierarchy_path_ << "/" << cgroup;
- lock_guard<mutex> l(active_cgroups_lock_);
- if (++active_cgroups_[cgroup] == 1) {
- *is_first = true;
- RETURN_IF_ERROR(CreateCgroup(cgroup, false));
- active_cgroups_metric_->Increment(1);
- } else {
- *is_first = false;
- }
- return Status::OK();
-}
-
-Status CgroupsMgr::UnregisterFragment(const TUniqueId& fragment_instance_id,
- const string& cgroup) {
- if (cgroup.empty() || cgroups_hierarchy_path_.empty()) return Status::OK();
-
- LOG(INFO) << "Unregistering fragment " << PrintId(fragment_instance_id)
- << " from CGroup " << cgroups_hierarchy_path_ << "/" << cgroup;
- lock_guard<mutex> l(active_cgroups_lock_);
- unordered_map<string, int32_t>::iterator entry = active_cgroups_.find(cgroup);
- DCHECK(entry != active_cgroups_.end());
-
- int32_t* ref_count = &entry->second;
- --(*ref_count);
- if (*ref_count == 0) {
- RETURN_IF_ERROR(RelocateThreads(cgroup, staging_cgroup_));
- RETURN_IF_ERROR(DropCgroup(cgroup, false));
- active_cgroups_metric_->Increment(-1);
- active_cgroups_.erase(entry);
- }
- return Status::OK();
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/cgroups-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/util/cgroups-mgr.h b/be/src/util/cgroups-mgr.h
deleted file mode 100644
index 2e52f6b..0000000
--- a/be/src/util/cgroups-mgr.h
+++ /dev/null
@@ -1,175 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_UTIL_CGROUPS_MGR_H
-#define IMPALA_UTIL_CGROUPS_MGR_H
-
-#include <string>
-#include <boost/thread/mutex.hpp>
-#include <boost/unordered_map.hpp>
-#include "common/status.h"
-#include "util/metrics.h"
-#include "util/thread.h"
-
-namespace impala {
-
-/// Control Groups, or 'cgroups', are a Linux-specific mechanism for arbitrating resources
-/// amongst threads.
-//
-/// CGroups are organised in a forest of 'hierarchies', each of which are mounted at a path
-/// in the filesystem. Each hierarchy contains one or more cgroups, arranged
-/// hierarchically. Each hierarchy has one or more 'subsystems' attached. Each subsystem
-/// represents a resource to manage, so for example there is a CPU subsystem and a MEMORY
-/// subsystem. There are rules about when subsystems may be attached to more than one
-/// hierarchy, which are out of scope of this description.
-//
-/// Each thread running on a kernel with cgroups enabled belongs to exactly one cgroup in
-/// every hierarchy at once. Impala is only concerned with a single hierarchy that assigns
-/// CPU resources in the first instance. Threads are assigned to cgroups by writing their
-/// thread ID to a file in the special cgroup filesystem.
-//
-/// For more information:
-/// access.redhat.com/site/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Resource_Management_Guide/
-/// www.kernel.org/doc/Documentation/cgroups/cgroups.txt
-
-/// Manages the lifecycle of Impala-internal cgroups as well as the assignment of
-/// execution threads into cgroups.
-/// To execute queries Impala requests resources from Yarn via the Llama. Yarn returns
-/// granted resources via the Llama in the form or "RM resource ids" that conventionally
-/// correspond to a CGroups that the Yarn NM creates. Instead of directly using the
-/// NM-provided CGroups, Impala creates and manages its own CGroups for the
-/// following reasons:
-/// 1. In typical CM/Yarn setups, Impala would not have permissions to write to the tasks
-/// file of NM-provided CGroups. It is arguably not even desirable (e.g., for security
-/// reasons) for external process to be able to manipulate the permissions of
-/// NM-generated CGroups either directly or indirectly.
-/// 2. Yarn-granted CGroups are created asynchronously (the AM calls to create the
-/// CGroups are non-blocking). From Impala's perspective that means that once Impala
-/// receives notice from the Llama that resources have been granted, it cannot
-/// assume that the corresponding containers have been created (although the Yarn
-/// NMs eventually will). While each of Impala's plan fragments could wait for the
-/// CGroups to be created, it seems unnecessarily complicated and slow to do so.
-/// 3. Impala will probably want to manage its own CGroups eventually, e.g., for
-/// optimistic query scheduling.
-//
-/// In summary, the typical CGroups-related flow of an Impala query is as follows:
-/// 1. Impala receives granted resources from Llama and sends out plan fragments
-/// 2. On each node execution such a fragment, convert the Yarn resource id into
-/// a CGroup that Impala should create and assign the query's threads to
-/// 3. Register the fragment(s) and the CGroup for the query with the
-/// node-local CGroup manager. The registration creates the CGroup maintains a
-/// count of all fragments using that CGroup.
-/// 4. Execute the fragments, assigning threads into the Impala-managed CGroup.
-/// 5. Complete the fragments by unregistering them with the CGroup from the node-local
-/// CGroups manager. When the last fragment for a CGroup is unregistered, all threads
-/// from that CGroup are relocated into a special staging CGroup, so that the now
-/// unused CGroup can safely be deleted (otherwise, we'd have to wait for the OS to
-/// drain all entries from the CGroup's tasks file)
-class CgroupsMgr {
- public:
- CgroupsMgr(MetricGroup* metrics);
-
- /// Sets the cgroups mgr's corresponding members and creates the staging cgroup
- /// under <cgroups_hierarchy_path>/<staging_cgroup>. Returns a non-OK status if
- /// creation of the staging cgroup failed, e.g., because of insufficient privileges.
- Status Init(const std::string& cgroups_hierarchy_path,
- const std::string& staging_cgroup);
-
- /// Returns the cgroup Impala should create and use for enforcing granted resources
- /// identified by the given unique ID (which usually corresponds to a query ID). Returns
- /// an empty string if unique_id is empty.
- std::string UniqueIdToCgroup(const std::string& unique_id) const;
-
- /// Returns the cgroup CPU shares corresponding to the given number of virtual cores.
- /// Returns -1 if v_cpu_cores is <= 0 (which is invalid).
- int32_t VirtualCoresToCpuShares(int16_t v_cpu_cores);
-
- /// Informs the cgroups mgr that a plan fragment intends to use the given cgroup.
- /// If this is the first fragment requesting use of cgroup, then the cgroup will
- /// be created and *is_first will be set to true (otherwise to false). In any case the
- /// reference count active_cgroups_[cgroup] is incremented. Returns a non-OK status
- /// if there was an error creating the cgroup.
- Status RegisterFragment(const TUniqueId& fragment_instance_id,
- const std::string& cgroup, bool* is_first);
-
- /// Informs the cgroups mgr that a plan fragment using the given cgroup is complete.
- /// Decrements the corresponding reference count active_cgroups_[cgroup]. If the
- /// reference count reaches zero this function relocates all thread ids from
- /// the cgroup to the staging_cgroup_ and drops cgroup (a cgroup with active thread ids
- /// cannot be dropped, so we relocate the thread ids first).
- /// Returns a non-OK status there was an error creating the cgroup.
- Status UnregisterFragment(const TUniqueId& fragment_instance_id,
- const std::string& cgroup);
-
- /// Creates a cgroup at <cgroups_hierarchy_path_>/<cgroup>. Returns a non-OK status
- /// if the cgroup creation failed, e.g., because of insufficient privileges.
- /// If is_not_exists is true then no error is returned if the cgroup already exists.
- Status CreateCgroup(const std::string& cgroup, bool if_not_exists) const;
-
- /// Drops the cgroup at <cgroups_hierarchy_path_>/<cgroup>. Returns a non-OK status
- /// if the cgroup deletion failed, e.g., because of insufficient privileges.
- /// If if_exists is true then no error is returned if the cgroup does not exist.
- Status DropCgroup(const std::string& cgroup, bool if_exists) const;
-
- /// Sets the number of CPU shares for the given cgroup by writing num_shares into the
- /// cgroup's cpu.shares file. Returns a non-OK status if there was an error writing
- /// to the file, e.g., because of insufficient privileges.
- Status SetCpuShares(const std::string& cgroup, int32_t num_shares);
-
- /// Assigns a given thread to a cgroup, by writing its thread id to
- /// <cgroups_hierarchy_path_>/<cgroup>/tasks. If there is no file at that
- /// location, returns an error. Otherwise no attempt is made to check that the
- /// target belongs to a cgroup hierarchy due to the cost of reading and parsing
- /// cgroup information from the filesystem.
- Status AssignThreadToCgroup(const Thread& thread, const std::string& cgroup) const;
-
- /// Reads the <cgroups_hierarchy_path_>/<src_cgroup>/tasks file and writing all the
- /// contained thread ids to <cgroups_hierarchy_path_>/<dst_cgroup>/tasks.
- /// Assumes that the destination cgroup has already been created. Returns a non-OK
- /// status if there was an error reading src_cgroup and/or writing dst_cgroup.
- Status RelocateThreads(const std::string& src_cgroup,
- const std::string& dst_cgroup) const;
-
- private:
- /// Checks that the cgroups_hierarchy_path_ and the given cgroup under it exists.
- /// Returns an error if either of them do not exist.
- /// Returns the absolute cgroup path and the absolute path to its tasks file.
- Status GetCgroupPaths(const std::string& cgroup,
- std::string* cgroup_path, std::string* tasks_path) const;
-
- /// Number of currently active Impala-managed cgroups.
- IntGauge* active_cgroups_metric_;
-
- /// Root of the CPU cgroup hierarchy. Created cgroups are placed directly under it.
- std::string cgroups_hierarchy_path_;
-
- /// Cgroup that threads from completed queries are relocated into such that the
- /// query's cgroup can be dropped.
- std::string staging_cgroup_;
-
- /// Protects active_cgroups_.
- boost::mutex active_cgroups_lock_;
-
- /// Process-wide map from cgroup to number of fragments using the cgroup.
- /// A cgroup can be safely dropped once the number of fragments in the cgroup,
- /// according to this map, reaches zero.
- boost::unordered_map<std::string, int32_t> active_cgroups_;
-};
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 4b17ed7..a02a288 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -32,7 +32,6 @@
#include "gen-cpp/RuntimeProfile_types.h"
#include "gen-cpp/ImpalaService_types.h"
#include "gen-cpp/parquet_types.h"
-#include "gen-cpp/Llama_types.h"
#include "runtime/descriptors.h" // for SchemaPath
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/llama-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/llama-util.cc b/be/src/util/llama-util.cc
deleted file mode 100644
index 82f2bd6..0000000
--- a/be/src/util/llama-util.cc
+++ /dev/null
@@ -1,152 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/llama-util.h"
-
-#include <sstream>
-#include <boost/algorithm/string/join.hpp>
-#include <boost/algorithm/string.hpp>
-
-#include "common/names.h"
-#include "util/debug-util.h"
-#include "util/uid-util.h"
-
-using boost::algorithm::is_any_of;
-using boost::algorithm::join;
-using boost::algorithm::split;
-using namespace llama;
-
-namespace llama {
-
-string PrintId(const TUniqueId& id, const string& separator) {
- return PrintId(impala::CastTUniqueId<TUniqueId, impala::TUniqueId>(id), separator);
-}
-
-ostream& operator<<(ostream& os, const TUniqueId& id) {
- os << hex << id.hi << ":" << id.lo;
- return os;
-}
-
-ostream& operator<<(ostream& os, const TNetworkAddress& address) {
- os << address.hostname << ":" << dec << address.port;
- return os;
-}
-
-ostream& operator<<(ostream& os, const TResource& resource) {
- os << "Resource("
- << "client_resource_id=" << resource.client_resource_id << " "
- << "v_cpu_cores=" << dec << resource.v_cpu_cores << " "
- << "memory_mb=" << dec << resource.memory_mb << " "
- << "asked_location=" << resource.askedLocation << " "
- << "enforcement=" << resource.enforcement << ")";
- return os;
-}
-
-ostream& operator<<(ostream& os, const TAllocatedResource& resource) {
- os << "Allocated Resource("
- << "reservation_id=" << resource.reservation_id << " "
- << "client_resource_id=" << resource.client_resource_id << " "
- << "rm_resource_id=" << resource.rm_resource_id << " "
- << "v_cpu_cores=" << dec << resource.v_cpu_cores << " "
- << "memory_mb=" << dec << resource.memory_mb << " "
- << "location=" << resource.location << ")";
- return os;
-}
-
-ostream& operator<<(ostream& os, const llama::TLlamaAMGetNodesRequest& request) {
- os << "GetNodes Request(llama handle=" << request.am_handle << ")";
- return os;
-}
-
-ostream& operator<<(ostream& os, const llama::TLlamaAMReservationRequest& request) {
- os << "Reservation Request("
- << "llama handle=" << request.am_handle << " "
- << "queue=" << request.queue << " "
- << "user=" << request.user << " "
- << "gang=" << request.gang << " "
- << "resources=[";
- for (int i = 0; i < request.resources.size(); ++i) {
- os << request.resources[i];
- if (i + 1 != request.resources.size()) os << ",";
- }
- os << "])";
- return os;
-}
-
-ostream& operator<<(ostream& os,
- const llama::TLlamaAMReservationExpansionRequest& request) {
- os << "Expansion Request("
- << "llama handle=" << request.am_handle << " "
- << "reservation id=" << request.expansion_of << " "
- << "resource=" << request.resource << ")";
- return os;
-}
-
-ostream& operator<<(ostream& os, const llama::TLlamaAMReleaseRequest& request) {
- os << "Release Request("
- << "llama handle=" << request.am_handle << " "
- << "reservation id=" << request.reservation_id << ")";
- return os;
-}
-
-llama::TUniqueId& operator<<(llama::TUniqueId& dest, const impala::TUniqueId& src) {
- dest.lo = src.lo;
- dest.hi = src.hi;
- return dest;
-}
-
-impala::TUniqueId& operator<<(impala::TUniqueId& dest, const llama::TUniqueId& src) {
- dest.lo = src.lo;
- dest.hi = src.hi;
- return dest;
-}
-
-bool operator==(const impala::TUniqueId& impala_id, const llama::TUniqueId& llama_id) {
- return impala_id.lo == llama_id.lo && impala_id.hi == llama_id.hi;
-}
-
-llama::TNetworkAddress& operator<<(llama::TNetworkAddress& dest,
- const impala::TNetworkAddress& src) {
- dest.hostname = src.hostname;
- dest.port = src.port;
- return dest;
-}
-
-impala::TNetworkAddress& operator<<(impala::TNetworkAddress& dest,
- const llama::TNetworkAddress& src) {
- dest.hostname = src.hostname;
- dest.port = src.port;
- return dest;
-}
-
-impala::Status LlamaStatusToImpalaStatus(const TStatus& status,
- const string& err_prefix) {
- if (status.status_code == TStatusCode::OK) return impala::Status::OK();
- stringstream ss;
- ss << err_prefix << " " << join(status.error_msgs, ", ");
- return impala::Status(ss.str());
-}
-
-string GetShortName(const string& user) {
- if (user.empty() || user[0] == '/' || user[0] == '@') return user;
-
- vector<string> components;
- split(components, user, is_any_of("/@"));
- return components[0];
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/llama-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/llama-util.h b/be/src/util/llama-util.h
deleted file mode 100644
index f6fc4ce..0000000
--- a/be/src/util/llama-util.h
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_UTIL_LLAMA_UTIL_H
-#define IMPALA_UTIL_LLAMA_UTIL_H
-
-#include <ostream>
-#include <string>
-#include <boost/functional/hash.hpp>
-
-#include "gen-cpp/Types_types.h" // for TUniqueId
-#include "gen-cpp/Llama_types.h" // for TUniqueId
-#include "common/status.h"
-
-namespace llama {
-
-std::ostream& operator<<(std::ostream& os, const llama::TUniqueId& id);
-std::ostream& operator<<(std::ostream& os, const llama::TNetworkAddress& address);
-std::ostream& operator<<(std::ostream& os, const llama::TResource& resource);
-std::ostream& operator<<(std::ostream& os, const llama::TAllocatedResource& resource);
-
-std::ostream& operator<<(std::ostream& os,
- const llama::TLlamaAMGetNodesRequest& request);
-std::ostream& operator<<(std::ostream& os,
- const llama::TLlamaAMReservationRequest& request);
-std::ostream& operator<<(std::ostream& os,
- const llama::TLlamaAMReservationExpansionRequest& request);
-std::ostream& operator<<(std::ostream& os,
- const llama::TLlamaAMReleaseRequest& request);
-
-/// 'Assignment' operators to convert types between the llama and impala namespaces.
-llama::TUniqueId& operator<<(llama::TUniqueId& dest, const impala::TUniqueId& src);
-impala::TUniqueId& operator<<(impala::TUniqueId& dest, const llama::TUniqueId& src);
-
-std::string PrintId(const llama::TUniqueId& id, const std::string& separator = ":");
-
-bool operator==(const impala::TUniqueId& impala_id, const llama::TUniqueId& llama_id);
-
-llama::TNetworkAddress& operator<<(llama::TNetworkAddress& dest,
- const impala::TNetworkAddress& src);
-impala::TNetworkAddress& operator<<(impala::TNetworkAddress& dest,
- const llama::TNetworkAddress& src);
-
-impala::Status LlamaStatusToImpalaStatus(const llama::TStatus& status,
- const std::string& err_prefix = "");
-
-/// This function must be called 'hash_value' to be picked up by boost.
-inline std::size_t hash_value(const llama::TUniqueId& id) {
- std::size_t seed = 0;
- boost::hash_combine(seed, id.lo);
- boost::hash_combine(seed, id.hi);
- return seed;
-}
-
-/// Get the short version of the user name (the user's name up to the first '/' or '@')
-/// If neither are found (or are found at the beginning of the user name) return username.
-std::string GetShortName(const std::string& user);
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index 8fc1bcc..ccc49c9 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -114,10 +114,6 @@ class ThreadPool {
Join();
}
- Status AssignToCgroup(const std::string& cgroup) {
- return threads_.SetCgroup(cgroup);
- }
-
private:
/// Driver method for each thread in the pool. Continues to read work from the queue
/// until the pool is shutdown.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 6a8b9b8..757ba59 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -26,7 +26,6 @@
#include "util/coding-util.h"
#include "util/debug-util.h"
#include "util/error-util.h"
-#include "util/cgroups-mgr.h"
#include "util/metrics.h"
#include "util/webserver.h"
#include "util/os-util.h"
@@ -321,10 +320,6 @@ void Thread::SuperviseThread(const string& name, const string& category,
Status ThreadGroup::AddThread(Thread* thread) {
threads_.push_back(thread);
- if (!cgroup_path_.empty()) {
- DCHECK(cgroups_mgr_ != NULL);
- RETURN_IF_ERROR(cgroups_mgr_->AssignThreadToCgroup(*thread, cgroup_path_));
- }
return Status::OK();
}
@@ -332,13 +327,4 @@ void ThreadGroup::JoinAll() {
for (const Thread& thread: threads_) thread.Join();
}
-Status ThreadGroup::SetCgroup(const string& cgroup) {
- DCHECK(cgroups_mgr_ != NULL);
- cgroup_path_ = cgroup;
- for (const Thread& t: threads_) {
- RETURN_IF_ERROR(cgroups_mgr_->AssignThreadToCgroup(t, cgroup));
- }
- return Status::OK();
-}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 8c880d2..4e2b65d 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -31,7 +31,6 @@ namespace impala {
class MetricGroup;
class Webserver;
-class CgroupsMgr;
/// Thin wrapper around boost::thread that can register itself with the singleton
/// ThreadMgr (a private class implemented in thread.cc entirely, which tracks all live
@@ -165,39 +164,19 @@ class ThreadGroup {
public:
ThreadGroup() {}
- ThreadGroup(CgroupsMgr* cgroups_mgr, const std::string& cgroup)
- : cgroups_mgr_(cgroups_mgr), cgroup_path_(cgroup) { }
-
/// Adds a new Thread to this group. The ThreadGroup takes ownership of the Thread, and
/// will destroy it when the ThreadGroup is destroyed. Threads will linger until that
/// point (even if terminated), however, so callers should be mindful of the cost of
/// placing very many threads in this set.
- /// If cgroup_path_ / cgroup_prefix_ are set, the thread will be added to the specified
- /// cgroup and an error will be returned if that operation fails.
Status AddThread(Thread* thread);
/// Waits for all threads to finish. DO NOT call this from a thread inside this set;
/// deadlock will predictably ensue.
void JoinAll();
- /// Assigns all current and future threads to the given cgroup managed by cgroups_mgr_.
- /// Must be called after SetCgroupsMgr() if groups_mgr_ has not been set already.
- /// Returns an error if any assignment was not possible, but does not undo previously
- /// successful assignments.
- Status SetCgroup(const std::string& cgroup);
-
- void SetCgroupsMgr(CgroupsMgr* cgroups_mgr) { cgroups_mgr_ = cgroups_mgr; }
-
private:
/// All the threads grouped by this set.
boost::ptr_vector<Thread> threads_;
-
- /// Cgroups manager for assigning threads in this group to cgroups. Not owned.
- CgroupsMgr* cgroups_mgr_;
-
- /// If not empty, every thread added to this group will also be placed in the
- /// cgroup_path_ managed by the cgroups_mgr_.
- std::string cgroup_path_;
};
/// Initialises the threading subsystem. Must be called before a Thread is created.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/util/uid-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index c78a9ea..f0f87ec 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -37,10 +37,7 @@ inline std::size_t hash_value(const impala::TUniqueId& id) {
return seed;
}
-/// Templated so that this method is not namespace-specific (since we also call this on
-/// llama::TUniqueId)
-template <typename T>
-inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, T* unique_id) {
+inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* unique_id) {
memcpy(&(unique_id->hi), &uuid.data[0], 8);
memcpy(&(unique_id->lo), &uuid.data[8], 8);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index d8621c4..6524e82 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -22,7 +22,7 @@
# that we can deduce the version settings of the dependencies from the environment.
# IMPALA_TOOLCHAIN indicates the location where the prebuilt artifacts should be extracted
# to. If DOWNLOAD_CDH_COMPONENTS is set to true, this script will also download and extract
-# the CDH components (i.e. Hadoop, Hive, HBase, Llama, Llama-minikdc and Sentry) into
+# the CDH components (i.e. Hadoop, Hive, HBase and Sentry) into
# CDH_COMPONENTS_HOME.
#
# The script is called as follows without any additional parameters:
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/create-test-configuration.sh
----------------------------------------------------------------------
diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh
index bbe8f61..8d695a4 100755
--- a/bin/create-test-configuration.sh
+++ b/bin/create-test-configuration.sh
@@ -148,7 +148,7 @@ if ${CLUSTER_DIR}/admin is_kerberized; then
# strange, but making these symlinks also results in data loading
# failures in the non-kerberized case. Without these, mapreduce
# jobs die in a kerberized cluster because they can't find their
- # kerberos principals. Obviously this has to be sorted out before
+ # kerberos principals. Obviously this has to be sorted out before
# a kerberized cluster can load data.
echo "Linking yarn and mapred from local cluster"
ln -s ${CLUSTER_HADOOP_CONF_DIR}/yarn-site.xml
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/generate_minidump_collection_testdata.py
----------------------------------------------------------------------
diff --git a/bin/generate_minidump_collection_testdata.py b/bin/generate_minidump_collection_testdata.py
index 27f9a42..a408e05 100755
--- a/bin/generate_minidump_collection_testdata.py
+++ b/bin/generate_minidump_collection_testdata.py
@@ -71,7 +71,6 @@ CONFIG_FILE = '''-beeswax_port=21000
-max_lineage_log_file_size=5000
-hostname=vb0204.halxg.cloudera.com
-state_store_host=vb0202.halxg.cloudera.com
--enable_rm=false
-state_store_port=24000
-catalog_service_host=vb0202.halxg.cloudera.com
-catalog_service_port=26000
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index bfde4c3..b92fcf1 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -41,8 +41,6 @@ parser.add_option("--build_type", dest="build_type", default= 'latest',
parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
default=[],
help="Additional arguments to pass to each Impalad during startup")
-parser.add_option("--enable_rm", dest="enable_rm", action="store_true", default=False,
- help="Enable resource management with Yarn and Llama.")
parser.add_option("--state_store_args", dest="state_store_args", action="append",
type="string", default=[],
help="Additional arguments to pass to State Store during startup")
@@ -91,8 +89,6 @@ IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d -be_port=%d "
"-llama_callback_port=%d")
JVM_ARGS = "-jvm_debug_port=%s -jvm_args=%s"
BE_LOGGING_ARGS = "-log_filename=%s -log_dir=%s -v=%s -logbufsecs=5 -max_log_files=%s"
-RM_ARGS = ("-enable_rm=true -llama_addresses=%s -cgroup_hierarchy_path=%s "
- "-fair_scheduler_allocation_path=%s")
CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
# Kills have a timeout to prevent automated scripts from hanging indefinitely.
# It is set to a high value to avoid failing if processes are slow to shut down.
@@ -208,20 +204,6 @@ def build_jvm_args(instance_num):
BASE_JVM_DEBUG_PORT = 30000
return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
-def build_rm_args(instance_num):
- if not options.enable_rm: return ""
- try:
- cgroup_path = cgroups.create_impala_cgroup_path(instance_num + 1)
- except Exception, ex:
- raise RuntimeError("Unable to initialize RM: %s" % str(ex))
- llama_address = "localhost:15000"
-
- # Don't bother checking if the path doesn't exist, the impalad won't start up
- relative_fs_cfg_path = 'cdh%s/node-%d/etc/hadoop/conf/fair-scheduler.xml' %\
- (os.environ.get('CDH_MAJOR_VERSION'), instance_num + 1)
- fs_cfg_path = os.path.join(os.environ.get('CLUSTER_DIR'), relative_fs_cfg_path)
- return RM_ARGS % (llama_address, cgroup_path, fs_cfg_path)
-
def start_impalad_instances(cluster_size):
if cluster_size == 0:
# No impalad instances should be started.
@@ -250,11 +232,10 @@ def start_impalad_instances(cluster_size):
# impalad args from the --impalad_args flag. Also replacing '#ID' with the instance.
param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
- args = "--mem_limit=%s %s %s %s %s %s" %\
+ args = "--mem_limit=%s %s %s %s %s" %\
(mem_limit, # Goes first so --impalad_args will override it.
build_impalad_logging_args(i, service_name), build_jvm_args(i),
- build_impalad_port_args(i), param_args,
- build_rm_args(i))
+ build_impalad_port_args(i), param_args)
stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/thrift/CMakeLists.txt b/common/thrift/CMakeLists.txt
index e66bd89..3104ee2 100644
--- a/common/thrift/CMakeLists.txt
+++ b/common/thrift/CMakeLists.txt
@@ -162,7 +162,6 @@ set (SRC_FILES
ImpalaService.thrift
JniCatalog.thrift
LineageGraph.thrift
- Llama.thrift
Logging.thrift
NetworkTest.thrift
MetricDefs.thrift
@@ -171,7 +170,6 @@ set (SRC_FILES
Planner.thrift
Partitions.thrift
parquet.thrift
- ResourceBrokerService.thrift
Results.thrift
RuntimeProfile.thrift
StatestoreService.thrift
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 708cb46..732ea4a 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -387,6 +387,7 @@ struct TQueryExecRequest {
// Estimated per-host CPU requirements in YARN virtual cores.
// Used for resource management.
+ // TODO: Remove this and associated code in Planner.
11: optional i16 per_host_vcores
// List of replica hosts. Used by the host_idx field of TScanRangeLocation.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index bf03d98..003a618 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -33,7 +33,6 @@ include "DataSinks.thrift"
include "Results.thrift"
include "RuntimeProfile.thrift"
include "ImpalaService.thrift"
-include "Llama.thrift"
// constants for TQueryOptions.num_nodes
const i32 NUM_NODES_ALL = 0
@@ -366,12 +365,6 @@ struct TPlanFragmentInstanceCtx {
// Id of this fragment in its role as a sender.
11: optional i32 sender_id
-
- // Resource reservation to run this plan fragment in.
- 12: optional Llama.TAllocatedResource reserved_resource
-
- // Address of local node manager (used for expanding resource allocations)
- 13: optional Types.TNetworkAddress local_resource_address
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/common/thrift/Llama.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Llama.thrift b/common/thrift/Llama.thrift
deleted file mode 100644
index a9b7f5f..0000000
--- a/common/thrift/Llama.thrift
+++ /dev/null
@@ -1,276 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-namespace cpp llama
-namespace java com.cloudera.llama.thrift
-
-////////////////////////////////////////////////////////////////////////////////
-// DATA TYPES
-
-enum TLlamaServiceVersion {
- V1
-}
-
-struct TUniqueId {
- 1: required i64 hi;
- 2: required i64 lo;
-}
-
-struct TNetworkAddress {
- 1: required string hostname;
- 2: required i32 port;
-}
-
-enum TStatusCode {
- OK,
- REQUEST_ERROR,
- INTERNAL_ERROR
-}
-
-struct TStatus {
- 1: required TStatusCode status_code;
- 2: i16 error_code;
- 3: list<string> error_msgs;
-}
-
-enum TLocationEnforcement {
- MUST,
- PREFERRED,
- DONT_CARE
-}
-
-struct TResource {
- 1: required TUniqueId client_resource_id;
- 2: required i16 v_cpu_cores;
- 3: required i32 memory_mb;
- 4: required string askedLocation;
- 5: required TLocationEnforcement enforcement;
-}
-
-struct TAllocatedResource {
- 1: required TUniqueId reservation_id;
- 2: required TUniqueId client_resource_id;
- 3: required string rm_resource_id;
- 4: required i16 v_cpu_cores;
- 5: required i32 memory_mb;
- 6: required string location;
-}
-
-struct TNodeCapacity {
- 1: required i16 total_v_cpu_cores;
- 2: required i32 total_memory_mb;
- 3: required i16 free_v_cpu_cores;
- 4: required i32 free_memory_mb;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Llama AM Service
-
-struct TLlamaAMRegisterRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId client_id;
- 3: required TNetworkAddress notification_callback_service;
-}
-
-struct TLlamaAMRegisterResponse {
- 1: required TStatus status;
- 2: optional TUniqueId am_handle;
-}
-
-struct TLlamaAMUnregisterRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId am_handle;
-}
-
-struct TLlamaAMUnregisterResponse {
- 1: required TStatus status;
-}
-
-struct TLlamaAMReservationRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId am_handle;
- 3: required string user;
- 4: optional string queue;
- 5: required list<TResource> resources;
- 6: required bool gang;
- 7: optional TUniqueId reservation_id;
-}
-
-struct TLlamaAMReservationResponse {
- 1: required TStatus status;
- 2: optional TUniqueId reservation_id;
-}
-
-struct TLlamaAMReservationExpansionRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId am_handle;
- 3: required TUniqueId expansion_of;
- 4: required TResource resource;
- 5: optional TUniqueId expansion_id;
-}
-
-struct TLlamaAMReservationExpansionResponse {
- 1: required TStatus status;
- 2: optional TUniqueId expansion_id;
-}
-
-struct TLlamaAMReleaseRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId am_handle;
- 3: required TUniqueId reservation_id;
-}
-
-struct TLlamaAMReleaseResponse {
- 1: required TStatus status;
-}
-
-struct TLlamaAMGetNodesRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId am_handle;
-}
-
-struct TLlamaAMGetNodesResponse {
- 1: required TStatus status;
- 2: optional list<string> nodes;
-}
-
-service LlamaAMService {
-
- TLlamaAMRegisterResponse Register(1: TLlamaAMRegisterRequest request);
-
- TLlamaAMUnregisterResponse Unregister(1: TLlamaAMUnregisterRequest request);
-
- TLlamaAMReservationResponse Reserve(1: TLlamaAMReservationRequest request);
-
- TLlamaAMReservationExpansionResponse Expand(
- 1: TLlamaAMReservationExpansionRequest request);
-
- TLlamaAMReleaseResponse Release(1: TLlamaAMReleaseRequest request);
-
- TLlamaAMGetNodesResponse GetNodes(1: TLlamaAMGetNodesRequest request);
-
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Llama AM Admin Service
-
-struct TLlamaAMAdminReleaseRequest {
- 1: required TLlamaServiceVersion version;
- 2: optional bool do_not_cache = false;
- 3: optional list<string> queues;
- 4: optional list<TUniqueId> handles;
- 5: optional list<TUniqueId> reservations;
-}
-
-struct TLlamaAMAdminReleaseResponse {
- 1: required TStatus status;
-}
-
-struct TLlamaAMAdminEmptyCacheRequest {
- 1: required TLlamaServiceVersion version;
- 2: optional bool allQueues = false;
- 3: optional list<string> queues;
-}
-
-struct TLlamaAMAdminEmptyCacheResponse {
- 1: required TStatus status;
-}
-
-service LlamaAMAdminService {
-
- TLlamaAMAdminReleaseResponse Release
- (1: TLlamaAMAdminReleaseRequest request);
-
- TLlamaAMAdminEmptyCacheResponse EmptyCache
- (1: TLlamaAMAdminEmptyCacheRequest request);
-
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Llama NM Service
-
-struct TLlamaNMRegisterRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId client_id;
- 3: required TNetworkAddress notification_callback_service;
-}
-
-struct TLlamaNMRegisterResponse {
- 1: required TStatus status;
- 2: optional TUniqueId nm_handle;
-}
-
-struct TLlamaNMUnregisterRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId nm_handle;
-}
-
-struct TLlamaNMUnregisterResponse {
- 1: required TStatus status;
-}
-
-service LlamaNMService {
-
- TLlamaNMRegisterResponse Register(1: TLlamaNMRegisterRequest request);
-
- TLlamaNMUnregisterResponse Unregister(1: TLlamaNMUnregisterRequest request);
-
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Llama Notification Callback Service
-
-struct TLlamaAMNotificationRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId am_handle;
- 3: required bool heartbeat;
- 4: optional list<TUniqueId> allocated_reservation_ids;
- 5: optional list<TAllocatedResource> allocated_resources;
- 6: optional list<TUniqueId> rejected_reservation_ids;
- 7: optional list<TUniqueId> rejected_client_resource_ids;
- 8: optional list<TUniqueId> lost_client_resource_ids;
- 9: optional list<TUniqueId> preempted_reservation_ids;
- 10: optional list<TUniqueId> preempted_client_resource_ids;
- 11: optional list<TUniqueId> admin_released_reservation_ids;
- 12: optional list<TUniqueId> lost_reservation_ids;
-}
-
-struct TLlamaAMNotificationResponse {
- 1: required TStatus status;
-}
-
-struct TLlamaNMNotificationRequest {
- 1: required TLlamaServiceVersion version;
- 2: required TUniqueId nm_handle;
- 3: required TNodeCapacity node_capacity;
- 4: list<string> preempted_rm_resource_ids;
-}
-
-struct TLlamaNMNotificationResponse {
- 1: required TStatus status;
-}
-
-service LlamaNotificationService {
-
- TLlamaAMNotificationResponse AMNotification(
- 1: TLlamaAMNotificationRequest request);
-
- TLlamaNMNotificationResponse NMNotification(
- 1: TLlamaNMNotificationRequest request);
-}
-
-////////////////////////////////////////////////////////////////////////////////