You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jb...@apache.org on 2016/08/25 21:29:19 UTC
[1/2] incubator-impala git commit: IMPALA-3394: Add tests,
make BackendConfig own class, refactor
Repository: incubator-impala
Updated Branches:
refs/heads/master 8cab36cf6 -> 19a2dcfbe
IMPALA-3394: Add tests, make BackendConfig own class, refactor
This change factors SimpleScheduler::BackendConfig into an own class and
adds unit tests for it.
Change-Id: I2d3acb6f68b16ca0af06dad0098d7ec1eff41202
Reviewed-on: http://gerrit.cloudera.org:8080/4116
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f5541d60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f5541d60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f5541d60
Branch: refs/heads/master
Commit: f5541d604046a9f47d1000c2ddc8f38a732c6456
Parents: 8cab36c
Author: Lars Volker <lv...@cloudera.com>
Authored: Fri Aug 19 00:36:19 2016 +0200
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Aug 25 20:23:04 2016 +0000
----------------------------------------------------------------------
be/src/scheduling/CMakeLists.txt | 2 +
be/src/scheduling/backend-config-test.cc | 100 ++++++++++++++++++++
be/src/scheduling/backend-config.cc | 89 ++++++++++++++++++
be/src/scheduling/backend-config.h | 75 +++++++++++++++
be/src/scheduling/simple-scheduler.cc | 130 +++-----------------------
be/src/scheduling/simple-scheduler.h | 65 ++-----------
be/src/util/network-util.cc | 38 +++++++-
be/src/util/network-util.h | 20 +++-
8 files changed, 337 insertions(+), 182 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 5ce7ff9..c5b4eb4 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -25,6 +25,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/scheduling")
# TODO: Move other scheduling-related classes here
add_library(Scheduling STATIC
admission-controller.cc
+ backend-config.cc
query-resource-mgr.cc
query-schedule.cc
request-pool-service.cc
@@ -33,5 +34,6 @@ add_library(Scheduling STATIC
add_dependencies(Scheduling thrift-deps)
ADD_BE_TEST(simple-scheduler-test)
+ADD_BE_TEST(backend-config-test)
# TODO: Add BE test
# ADD_BE_TEST(admission-controller-test)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/backend-config-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/backend-config-test.cc b/be/src/scheduling/backend-config-test.cc
new file mode 100644
index 0000000..82dc6a5
--- /dev/null
+++ b/be/src/scheduling/backend-config-test.cc
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "scheduling/backend-config.h"
+
+#include "common/logging.h"
+#include "common/names.h"
+#include "util/network-util.h"
+#include "util/thread.h"
+
+namespace impala {
+
+/// Test that BackendConfig can be created from a vector of backends.
+TEST(BackendConfigTest, MakeFromBackendVector) {
+ // This address needs to be resolvable using getaddrinfo().
+ vector<TNetworkAddress> backends {MakeNetworkAddress("localhost", 1001)};
+ BackendConfig backend_config(backends);
+ IpAddr backend_ip;
+ bool ret = backend_config.LookUpBackendIp(backends[0].hostname, &backend_ip);
+ ASSERT_TRUE(ret);
+ EXPECT_EQ("127.0.0.1", backend_ip);
+}
+
+/// Test adding multiple backends on different hosts.
+TEST(BackendConfigTest, AddBackends) {
+ BackendConfig backend_config;
+ backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
+ backend_config.AddBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002));
+ ASSERT_EQ(2, backend_config.NumBackends());
+ IpAddr backend_ip;
+ ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
+ EXPECT_EQ("10.0.0.1", backend_ip);
+ ASSERT_TRUE(backend_config.LookUpBackendIp("host_2", &backend_ip));
+ EXPECT_EQ("10.0.0.2", backend_ip);
+}
+
+/// Test adding multiple backends on the same host.
+TEST(BackendConfigTest, MultipleBackendsOnSameHost) {
+ BackendConfig backend_config;
+ backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
+ backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002));
+ IpAddr backend_ip;
+ ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
+ EXPECT_EQ("10.0.0.1", backend_ip);
+ const BackendConfig::BackendList& backend_list =
+ backend_config.GetBackendListForHost("10.0.0.1");
+ EXPECT_EQ(2, backend_list.size());
+}
+
+/// Test removing a backend.
+TEST(BackendConfigTest, RemoveBackend) {
+ BackendConfig backend_config;
+ backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
+ backend_config.AddBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002));
+ backend_config.RemoveBackend(MakeBackendDescriptor("host_2", "10.0.0.2", 1002));
+ IpAddr backend_ip;
+ ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
+ EXPECT_EQ("10.0.0.1", backend_ip);
+ ASSERT_FALSE(backend_config.LookUpBackendIp("host_2", &backend_ip));
+}
+
+/// Test removing one of multiple backends on the same host (IMPALA-3944).
+TEST(BackendConfigTest, RemoveBackendOnSameHost) {
+ BackendConfig backend_config;
+ backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1001));
+ backend_config.AddBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002));
+ backend_config.RemoveBackend(MakeBackendDescriptor("host_1", "10.0.0.1", 1002));
+ IpAddr backend_ip;
+ ASSERT_TRUE(backend_config.LookUpBackendIp("host_1", &backend_ip));
+ EXPECT_EQ("10.0.0.1", backend_ip);
+ const BackendConfig::BackendList& backend_list =
+ backend_config.GetBackendListForHost("10.0.0.1");
+ EXPECT_EQ(1, backend_list.size());
+}
+
+} // end namespace impala
+
+int main(int argc, char **argv) {
+ google::InitGoogleLogging(argv[0]);
+ impala::CpuInfo::Init();
+ impala::InitThreading();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/backend-config.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/backend-config.cc b/be/src/scheduling/backend-config.cc
new file mode 100644
index 0000000..e5c6824
--- /dev/null
+++ b/be/src/scheduling/backend-config.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/backend-config.h"
+
+namespace impala{
+
+BackendConfig::BackendConfig(const std::vector<TNetworkAddress>& backends) {
+ // Construct backend_map and backend_ip_map.
+ for (const TNetworkAddress& backend: backends) {
+ IpAddr ip;
+ Status status = HostnameToIpAddr(backend.hostname, &ip);
+ if (!status.ok()) {
+ VLOG(1) << status.GetDetail();
+ continue;
+ }
+ AddBackend(MakeBackendDescriptor(backend.hostname, ip, backend.port));
+ }
+}
+
+const BackendConfig::BackendList& BackendConfig::GetBackendListForHost(
+ const IpAddr& ip) const {
+ BackendMap::const_iterator it = backend_map_.find(ip);
+ DCHECK(it != backend_map_.end());
+ return it->second;
+}
+
+void BackendConfig::GetAllBackendIps(std::vector<IpAddr>* ip_addresses) const {
+ ip_addresses->reserve(NumBackends());
+ for (auto& it: backend_map_) ip_addresses->push_back(it.first);
+}
+
+void BackendConfig::GetAllBackends(BackendList* backends) const {
+ for (const auto& backend_list: backend_map_) {
+ backends->insert(backends->end(), backend_list.second.begin(),
+ backend_list.second.end());
+ }
+}
+
+void BackendConfig::AddBackend(const TBackendDescriptor& be_desc) {
+ DCHECK(!be_desc.ip_address.empty());
+ BackendList& be_descs = backend_map_[be_desc.ip_address];
+ if (find(be_descs.begin(), be_descs.end(), be_desc) == be_descs.end()) {
+ be_descs.push_back(be_desc);
+ }
+ backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
+}
+
+void BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) {
+ auto be_descs_it = backend_map_.find(be_desc.ip_address);
+ if (be_descs_it != backend_map_.end()) {
+ BackendList* be_descs = &be_descs_it->second;
+ be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
+ if (be_descs->empty()) {
+ backend_map_.erase(be_descs_it);
+ backend_ip_map_.erase(be_desc.address.hostname);
+ }
+ }
+}
+
+bool BackendConfig::LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const {
+ // Check if hostname is already a valid IP address.
+ if (backend_map_.find(hostname) != backend_map_.end()) {
+ if (ip) *ip = hostname;
+ return true;
+ }
+ auto it = backend_ip_map_.find(hostname);
+ if (it != backend_ip_map_.end()) {
+ if (ip) *ip = it->second;
+ return true;
+ }
+ return false;
+}
+
+} // end ns impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/backend-config.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/backend-config.h b/be/src/scheduling/backend-config.h
new file mode 100644
index 0000000..25f8292
--- /dev/null
+++ b/be/src/scheduling/backend-config.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SCHEDULING_BACKEND_CONFIG_H
+#define SCHEDULING_BACKEND_CONFIG_H
+
+#include <vector>
+
+#include <boost/unordered_map.hpp>
+
+#include "gen-cpp/StatestoreService_types.h"
+#include "gen-cpp/Types_types.h"
+#include "util/network-util.h"
+
+namespace impala {
+
+/// Configuration class to store a list of backends per IP address and a mapping from
+/// hostnames to IP addresses.
+class BackendConfig {
+ public:
+ BackendConfig() {}
+
+ /// Construct from list of backends.
+ BackendConfig(const std::vector<TNetworkAddress>& backends);
+
+ /// List of Backends.
+ typedef std::list<TBackendDescriptor> BackendList;
+
+ /// Return the list of backends on a particular host. The caller must make sure that the
+ /// host is actually contained in backend_map_.
+ const BackendList& GetBackendListForHost(const IpAddr& ip) const;
+
+ void GetAllBackendIps(std::vector<IpAddr>* ip_addresses) const;
+ void GetAllBackends(BackendList* backends) const;
+ void AddBackend(const TBackendDescriptor& be_desc);
+ void RemoveBackend(const TBackendDescriptor& be_desc);
+
+ /// Look up the IP address of 'hostname' in the internal backend maps and return
+ /// whether the lookup was successful. If 'hostname' itself is a valid IP address and
+ /// is contained in backend_map_, then it is copied to 'ip' and true is returned. 'ip'
+ /// can be NULL if the caller only wants to check whether the lookup succeeds. Use this
+ /// method to resolve datanode hostnames to IP addresses during scheduling, to prevent
+ /// blocking on the OS.
+ bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const;
+
+ int NumBackends() const { return backend_map_.size(); }
+
+ private:
+ /// Map from a host's IP address to a list of backends running on that node.
+ typedef boost::unordered_map<IpAddr, BackendList> BackendMap;
+ BackendMap backend_map_;
+
+ /// Map from a hostname to its IP address to support hostname based backend lookup. It
+ /// contains entries for all backends in backend_map_ and needs to be updated whenever
+ /// backend_map_ changes.
+ typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap;
+ BackendIpAddressMap backend_ip_map_;
+};
+
+} // end ns impala
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 5160393..9a865f0 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -185,7 +185,7 @@ Status SimpleScheduler::Init() {
if (metrics_ != NULL) {
// This is after registering with the statestored, so we already have to synchronize
// access to the backend_config_ shared_ptr.
- int num_backends = GetBackendConfig()->backend_map().size();
+ int num_backends = GetBackendConfig()->NumBackends();
total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0);
initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
@@ -210,10 +210,11 @@ Status SimpleScheduler::Init() {
void SimpleScheduler::BackendsUrlCallback(const Webserver::ArgumentMap& args,
Document* document) {
- BackendList backends;
- GetAllKnownBackends(&backends);
+ BackendConfig::BackendList backends;
+ BackendConfigPtr backend_config = GetBackendConfig();
+ backend_config->GetAllBackends(&backends);
Value backends_list(kArrayType);
- for (const BackendList::value_type& backend: backends) {
+ for (const TBackendDescriptor& backend: backends) {
Value str(TNetworkAddressToString(backend.address).c_str(), document->GetAllocator());
backends_list.PushBack(str, document->GetAllocator());
}
@@ -334,16 +335,6 @@ void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config)
backend_config_ = backend_config;
}
-
-void SimpleScheduler::GetAllKnownBackends(BackendList* backends) {
- backends->clear();
- BackendConfigPtr backend_config = GetBackendConfig();
- for (const BackendMap::value_type& backend_list: backend_config->backend_map()) {
- backends->insert(backends->end(), backend_list.second.begin(),
- backend_list.second.end());
- }
-}
-
Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
QuerySchedule* schedule) {
map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry;
@@ -871,109 +862,21 @@ void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id) {
}
}
-Status SimpleScheduler::HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) {
- // Try to resolve via the operating system.
- vector<IpAddr> ipaddrs;
- Status status = HostnameToIpAddrs(hostname, &ipaddrs);
- if (!status.ok() || ipaddrs.empty()) {
- stringstream ss;
- ss << "Failed to resolve " << hostname << ": " << status.GetDetail();
- return Status(ss.str());
- }
-
- // HostnameToIpAddrs() calls getaddrinfo() from glibc and will preserve the order of the
- // result. RFC 3484 only specifies a partial order so we need to sort the addresses
- // before picking the first non-localhost one.
- sort(ipaddrs.begin(), ipaddrs.end());
-
- // Try to find a non-localhost address, otherwise just use the first IP address
- // returned.
- *ip = ipaddrs[0];
- if (!FindFirstNonLocalhost(ipaddrs, ip)) {
- VLOG(3) << "Only localhost addresses found for " << hostname;
- }
- return Status::OK();
-}
-
-SimpleScheduler::BackendConfig::BackendConfig(
- const std::vector<TNetworkAddress>& backends) {
- // Construct backend_map and backend_ip_map.
- for (int i = 0; i < backends.size(); ++i) {
- IpAddr ip;
- Status status = HostnameToIpAddr(backends[i].hostname, &ip);
- if (!status.ok()) {
- VLOG(1) << status.GetDetail();
- continue;
- }
-
- BackendMap::iterator it = backend_map_.find(ip);
- if (it == backend_map_.end()) {
- it = backend_map_.insert(
- make_pair(ip, BackendList())).first;
- backend_ip_map_[backends[i].hostname] = ip;
- }
-
- TBackendDescriptor descriptor;
- descriptor.address = MakeNetworkAddress(ip, backends[i].port);
- descriptor.ip_address = ip;
- it->second.push_back(descriptor);
- }
-}
-
-void SimpleScheduler::BackendConfig::AddBackend(const TBackendDescriptor& be_desc) {
- DCHECK(!be_desc.ip_address.empty());
- BackendList* be_descs = &backend_map_[be_desc.ip_address];
- if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) {
- be_descs->push_back(be_desc);
- }
- backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
-}
-
-void SimpleScheduler::BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) {
- auto be_descs_it = backend_map_.find(be_desc.ip_address);
- if (be_descs_it != backend_map_.end()) {
- BackendList* be_descs = &be_descs_it->second;
- be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
- if (be_descs->empty()) {
- backend_map_.erase(be_descs_it);
- backend_ip_map_.erase(be_desc.address.hostname);
- }
- }
-}
-
-bool SimpleScheduler::BackendConfig::LookUpBackendIp(const Hostname& hostname,
- IpAddr* ip) const {
- // Check if hostname is already a valid IP address.
- if (backend_map_.find(hostname) != backend_map_.end()) {
- if (ip) *ip = hostname;
- return true;
- }
- auto it = backend_ip_map_.find(hostname);
- if (it != backend_ip_map_.end()) {
- if (ip) *ip = it->second;
- return true;
- }
- return false;
-}
-
SimpleScheduler::AssignmentCtx::AssignmentCtx(
const BackendConfig& backend_config,
IntCounter* total_assignments, IntCounter* total_local_assignments)
: backend_config_(backend_config), first_unused_backend_idx_(0),
total_assignments_(total_assignments),
total_local_assignments_(total_local_assignments) {
- random_backend_order_.reserve(backend_map().size());
- for (auto& v: backend_map()) random_backend_order_.push_back(&v);
+ backend_config.GetAllBackendIps(&random_backend_order_);
std::mt19937 g(rand());
std::shuffle(random_backend_order_.begin(), random_backend_order_.end(), g);
// Initialize inverted map for backend rank lookups
int i = 0;
- for (const BackendMap::value_type* v: random_backend_order_) {
- random_backend_rank_[v->first] = i++;
- }
+ for (const IpAddr& ip: random_backend_order_) random_backend_rank_[ip] = i++;
}
-const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBackendHost(
+const IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBackendHost(
const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) {
DCHECK(!data_locations.empty());
// List of candidate indexes into 'data_locations'.
@@ -1005,7 +908,7 @@ const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBacken
return &data_locations[*min_rank_idx];
}
-const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectRemoteBackendHost() {
+const IpAddr* SimpleScheduler::AssignmentCtx::SelectRemoteBackendHost() {
const IpAddr* candidate_ip;
if (HasUnusedBackends()) {
// Pick next unused backend.
@@ -1024,11 +927,9 @@ bool SimpleScheduler::AssignmentCtx::HasUnusedBackends() const {
return first_unused_backend_idx_ < random_backend_order_.size();
}
-const SimpleScheduler::IpAddr*
- SimpleScheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() {
+const IpAddr* SimpleScheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() {
DCHECK(HasUnusedBackends());
- const IpAddr* ip = &(random_backend_order_[first_unused_backend_idx_++])->first;
- DCHECK(backend_map().find(*ip) != backend_map().end());
+ const IpAddr* ip = &random_backend_order_[first_unused_backend_idx_++];
return ip;
}
@@ -1040,14 +941,14 @@ int SimpleScheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const {
void SimpleScheduler::AssignmentCtx::SelectBackendOnHost(const IpAddr& backend_ip,
TBackendDescriptor* backend) {
- BackendMap::const_iterator backend_it = backend_map().find(backend_ip);
- DCHECK(backend_it != backend_map().end());
- const BackendList& backends_on_host = backend_it->second;
+ DCHECK(backend_config_.LookUpBackendIp(backend_ip, NULL));
+ const BackendConfig::BackendList& backends_on_host =
+ backend_config_.GetBackendListForHost(backend_ip);
DCHECK(backends_on_host.size() > 0);
if (backends_on_host.size() == 1) {
*backend = *backends_on_host.begin();
} else {
- BackendList::const_iterator* next_backend_on_host;
+ BackendConfig::BackendList::const_iterator* next_backend_on_host;
next_backend_on_host = FindOrInsert(&next_backend_per_host_, backend_ip,
backends_on_host.begin());
DCHECK(find(backends_on_host.begin(), backends_on_host.end(), **next_backend_on_host)
@@ -1078,7 +979,6 @@ void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
IpAddr backend_ip;
backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip);
DCHECK(!backend_ip.empty());
- DCHECK(backend_map().find(backend_ip) != backend_map().end());
assignment_heap_.InsertOrUpdate(backend_ip, scan_range_length,
GetBackendRank(backend_ip));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index a6ab93e..dd119c2 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -34,6 +34,7 @@
#include "util/metrics.h"
#include "util/runtime-profile.h"
#include "scheduling/admission-controller.h"
+#include "scheduling/backend-config.h"
#include "gen-cpp/Types_types.h" // for TNetworkAddress
#include "gen-cpp/ResourceBrokerService_types.h"
#include "rapidjson/rapidjson.h"
@@ -95,55 +96,10 @@ class SimpleScheduler : public Scheduler {
virtual void HandleLostResource(const TUniqueId& client_resource_id);
private:
- /// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses.
- typedef std::string Hostname;
-
- /// Type to store IPv4 addresses.
- typedef std::string IpAddr;
-
- typedef std::list<TBackendDescriptor> BackendList;
-
- /// Map from a host's IP address to a list of backends running on that node.
- typedef boost::unordered_map<IpAddr, BackendList> BackendMap;
-
/// Map from a host's IP address to the next backend to be round-robin scheduled for
/// that host (needed for setups with multiple backends on a single host)
- typedef boost::unordered_map<IpAddr, BackendList::const_iterator> NextBackendPerHost;
-
- /// Map from a hostname to its IP address to support hostname based backend lookup.
- typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap;
-
- /// Configuration class to store a list of backends per IP address and a mapping from
- /// hostnames to IP addresses. backend_ip_map contains entries for all backends in
- /// backend_map and needs to be updated whenever backend_map changes. Each plan node
- /// creates a read-only copy of the scheduler's current backend_config_ to use during
- /// scheduling.
- class BackendConfig {
- public:
- BackendConfig() {}
-
- /// Construct config from list of backends.
- BackendConfig(const std::vector<TNetworkAddress>& backends);
-
- void AddBackend(const TBackendDescriptor& be_desc);
- void RemoveBackend(const TBackendDescriptor& be_desc);
-
- /// Look up the IP address of 'hostname' in the internal backend maps and return
- /// whether the lookup was successful. If 'hostname' itself is a valid IP address then
- /// it is copied to 'ip' and true is returned. 'ip' can be NULL if the caller only
- /// wants to check whether the lookup succeeds. Use this method to resolve datanode
- /// hostnames to IP addresses during scheduling, to prevent blocking on the OS.
- bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const;
-
- int NumBackends() const { return backend_map().size(); }
-
- const BackendMap& backend_map() const { return backend_map_; }
- const BackendIpAddressMap& backend_ip_map() const { return backend_ip_map_; }
-
- private:
- BackendMap backend_map_;
- BackendIpAddressMap backend_ip_map_;
- };
+ typedef boost::unordered_map<IpAddr, BackendConfig::BackendList::const_iterator>
+ NextBackendPerHost;
typedef std::shared_ptr<const BackendConfig> BackendConfigPtr;
@@ -250,7 +206,6 @@ class SimpleScheduler : public Scheduler {
FragmentScanRangeAssignment* assignment);
const BackendConfig& backend_config() const { return backend_config_; }
- const BackendMap& backend_map() const { return backend_config_.backend_map(); }
/// Print the assignment and statistics to VLOG_FILE.
void PrintAssignment(const FragmentScanRangeAssignment& assignment);
@@ -279,7 +234,7 @@ class SimpleScheduler : public Scheduler {
int first_unused_backend_idx_;
/// Store a random permutation of backend hosts to select backends from.
- std::vector<const BackendMap::value_type*> random_backend_order_;
+ std::vector<IpAddr> random_backend_order_;
/// Track round robin information per backend host.
NextBackendPerHost next_backend_per_host_;
@@ -301,7 +256,9 @@ class SimpleScheduler : public Scheduler {
/// The scheduler's backend configuration. When receiving changes to the backend
/// configuration from the statestore we will make a copy of the stored object, apply
- /// the updates to the copy and atomically swap the contents of this pointer.
+ /// the updates to the copy and atomically swap the contents of this pointer. Each plan
+ /// node creates a read-only copy of the scheduler's current backend_config_ to use
+ /// during scheduling.
BackendConfigPtr backend_config_;
/// Protect access to backend_config_ which might otherwise be updated asynchronously
@@ -382,9 +339,6 @@ class SimpleScheduler : public Scheduler {
BackendConfigPtr GetBackendConfig() const;
void SetBackendConfig(const BackendConfigPtr& backend_config);
- /// Return a list of all backends registered with the scheduler.
- void GetAllKnownBackends(BackendList* backends);
-
/// Add the granted reservation and resources to the active_reservations_ and
/// active_client_resources_ maps, respectively.
void AddToActiveResourceMaps(
@@ -517,11 +471,6 @@ class SimpleScheduler : public Scheduler {
int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
const TQueryExecRequest& exec_request);
- /// Deterministically resolve a host to one of its IP addresses. This method will call
- /// into the OS, so it can take a long time to return. Use this method to resolve
- /// hostnames during initialization and while processing statestore updates.
- static Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip);
-
friend class impala::SchedulerWrapper;
FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached);
FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/util/network-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc
index bce350d..afa5b32 100644
--- a/be/src/util/network-util.cc
+++ b/be/src/util/network-util.cc
@@ -59,16 +59,18 @@ Status GetHostname(string* hostname) {
return Status::OK();
}
-Status HostnameToIpAddrs(const string& name, vector<string>* addresses) {
+Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){
+ // Try to resolve via the operating system.
+ vector<IpAddr> addresses;
addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET; // IPv4 addresses only
hints.ai_socktype = SOCK_STREAM;
struct addrinfo* addr_info;
- if (getaddrinfo(name.c_str(), NULL, &hints, &addr_info) != 0) {
+ if (getaddrinfo(hostname.c_str(), NULL, &hints, &addr_info) != 0) {
stringstream ss;
- ss << "Could not find IPv4 address for: " << name;
+ ss << "Could not find IPv4 address for: " << hostname;
return Status(ss.str());
}
@@ -79,15 +81,32 @@ Status HostnameToIpAddrs(const string& name, vector<string>* addresses) {
inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr, addr_buf, 64);
if (result == NULL) {
stringstream ss;
- ss << "Could not convert IPv4 address for: " << name;
+ ss << "Could not convert IPv4 address for: " << hostname;
freeaddrinfo(addr_info);
return Status(ss.str());
}
- addresses->push_back(string(addr_buf));
+ addresses.push_back(string(addr_buf));
it = it->ai_next;
}
freeaddrinfo(addr_info);
+
+ if (addresses.empty()) {
+ stringstream ss;
+ ss << "Could not convert IPv4 address for: " << hostname;
+ return Status(ss.str());
+ }
+
+ // RFC 3484 only specifies a partial order for the result of getaddrinfo() so we need to
+ // sort the addresses before picking the first non-localhost one.
+ sort(addresses.begin(), addresses.end());
+
+ // Try to find a non-localhost address, otherwise just use the first IP address
+ // returned.
+ *ip = addresses[0];
+ if (!FindFirstNonLocalhost(addresses, ip)) {
+ VLOG(3) << "Only localhost addresses found for " << hostname;
+ }
return Status::OK();
}
@@ -128,6 +147,15 @@ TNetworkAddress MakeNetworkAddress(const string& address) {
return ret;
}
+/// Utility method because Thrift does not supply useful constructors
+TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr& ip,
+ int port) {
+ TBackendDescriptor be_desc;
+ be_desc.address = MakeNetworkAddress(hostname, port);
+ be_desc.ip_address = ip;
+ return be_desc;
+}
+
bool IsWildcardAddress(const string& ipaddress) {
return ipaddress == "0.0.0.0";
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f5541d60/be/src/util/network-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 57e95a1..315d451 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -16,15 +16,23 @@
// under the License.
#include "common/status.h"
+#include "gen-cpp/StatestoreService_types.h"
#include "gen-cpp/Types_types.h"
#include <vector>
namespace impala {
-/// Looks up all IP addresses associated with a given hostname. Returns
-/// an error status if any system call failed, otherwise OK. Even if OK
-/// is returned, addresses may still be of zero length.
-Status HostnameToIpAddrs(const std::string& name, std::vector<std::string>* addresses);
+/// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses.
+typedef std::string Hostname;
+
+/// Type to store IPv4 addresses.
+typedef std::string IpAddr;
+
+/// Looks up all IP addresses associated with a given hostname and returns one of them via
+/// 'address'. If the IP addresses of a host don't change, then subsequent calls will
+/// always return the same address. Returns an error status if any system call failed,
+/// otherwise OK. Even if OK is returned, addresses may still be of zero length.
+Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip);
/// Finds the first non-localhost IP address in the given list. Returns
/// true if such an address was found, false otherwise.
@@ -43,6 +51,10 @@ TNetworkAddress MakeNetworkAddress(const std::string& hostname, int port);
/// hostname and a port of 0.
TNetworkAddress MakeNetworkAddress(const std::string& address);
+/// Utility method because Thrift does not supply useful constructors
+TBackendDescriptor MakeBackendDescriptor(const Hostname& hostname, const IpAddr& ip,
+ int port);
+
/// Returns true if the ip address parameter is the wildcard interface (0.0.0.0)
bool IsWildcardAddress(const std::string& ipaddress);
[2/2] incubator-impala git commit: Add .clang-format for Impala's C++
style
Posted by jb...@apache.org.
Add .clang-format for Impala's C++ style
Change-Id: I274c5993c7be344fc4b7729d21a13da993f9f3aa
Reviewed-on: http://gerrit.cloudera.org:8080/3886
Reviewed-by: Jim Apple <jb...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/19a2dcfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/19a2dcfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/19a2dcfb
Branch: refs/heads/master
Commit: 19a2dcfbec14f3d1ada8aec833462b8a48915df1
Parents: f5541d6
Author: Jim Apple <jb...@cloudera.com>
Authored: Wed Aug 10 10:48:30 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Aug 25 21:17:08 2016 +0000
----------------------------------------------------------------------
.clang-format | 13 +++++++++++++
1 file changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19a2dcfb/.clang-format
----------------------------------------------------------------------
diff --git a/.clang-format b/.clang-format
new file mode 100644
index 0000000..6d31ebc
--- /dev/null
+++ b/.clang-format
@@ -0,0 +1,13 @@
+BasedOnStyle: Google
+AlignAfterOpenBracket: 'false'
+AlignOperands: 'false'
+AllowShortFunctionsOnASingleLine: 'Inline'
+AllowShortIfStatementsOnASingleLine: 'true'
+BreakBeforeBinaryOperators: 'NonAssignment'
+BreakBeforeTernaryOperators: 'false'
+ColumnLimit: '90'
+ConstructorInitializerIndentWidth: '2'
+ContinuationIndentWidth: '4'
+PenaltyBreakBeforeFirstCallParameter: '99999999'
+SpacesBeforeTrailingComments: '1'
+Standard: 'Cpp11'
\ No newline at end of file