You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/08/21 09:04:48 UTC

[GitHub] huijunwu closed pull request #1868: replace hard coded tmaster ports in tmaster_unittest

huijunwu closed pull request #1868: replace hard coded tmaster ports in tmaster_unittest
URL: https://github.com/apache/incubator-heron/pull/1868
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/common/src/cpp/network/httpserver.cpp b/heron/common/src/cpp/network/httpserver.cpp
index 75de800ab6..c54fc3738e 100644
--- a/heron/common/src/cpp/network/httpserver.cpp
+++ b/heron/common/src/cpp/network/httpserver.cpp
@@ -47,13 +47,25 @@ sp_int32 HTTPServer::Start() {
             << "0.0.0.0"
             << ":" << port;
   // Bind to INADDR_ANY instead of using the hostname
-  sp_int32 retval = evhttp_bind_socket(http_, "0.0.0.0", port);
-  if (retval == 0) {
+  auto retval = evhttp_bind_socket_with_handle(http_, "0.0.0.0", port);
+  if (retval != nullptr) {
     // record the successfully bound hostport
     hostports_.push_back(std::make_pair(host, port));
+    // fetch the actual port
+    if (0 == port) {
+      evutil_socket_t fd = evhttp_bound_socket_get_fd(retval);
+      struct sockaddr_storage ss;
+      memset(&ss, 0, sizeof(ss));
+      ev_socklen_t socklen = sizeof(ss);
+      if (getsockname(fd, (struct sockaddr *)&ss, &socklen)) {
+        PLOG(ERROR) << "getsockname() failed";
+        return SP_NOTOK;
+      }
+      options_.set_port(ntohs(((struct sockaddr_in*)&ss)->sin_port));
+    }
   } else {
     // failed to bind on the socket
-    return retval;
+    return SP_NOTOK;
   }
   evhttp_set_max_body_size(http_, options_.get_max_packet_size());
   return SP_OK;
diff --git a/heron/common/src/cpp/network/httpserver.h b/heron/common/src/cpp/network/httpserver.h
index df9ff94157..2ce613761e 100644
--- a/heron/common/src/cpp/network/httpserver.h
+++ b/heron/common/src/cpp/network/httpserver.h
@@ -60,6 +60,7 @@ class HTTPServer {
 
   // Accessors
   EventLoop* getEventLoop() { return eventLoop_; }
+  const NetworkOptions& get_serveroptions() const { return options_; }
 
  private:
   void HandleHTTPRequest(struct evhttp_request* _request);
diff --git a/heron/tmaster/src/cpp/manager/stats-interface.h b/heron/tmaster/src/cpp/manager/stats-interface.h
index ac30a918ee..9ff9a69d6a 100644
--- a/heron/tmaster/src/cpp/manager/stats-interface.h
+++ b/heron/tmaster/src/cpp/manager/stats-interface.h
@@ -33,6 +33,8 @@ class StatsInterface {
                  TMetricsCollector* _collector, TMaster* tmaster);
   virtual ~StatsInterface();
 
+  const NetworkOptions& get_serveroptions() const { return http_server_->get_serveroptions(); }
+
  private:
   void HandleStatsRequest(IncomingHTTPRequest* _request);
   void HandleUnknownRequest(IncomingHTTPRequest* _request);
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.h b/heron/tmaster/src/cpp/manager/tcontroller.h
index 578506fc9e..fe082853c5 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.h
+++ b/heron/tmaster/src/cpp/manager/tcontroller.h
@@ -34,6 +34,8 @@ class TController {
   // Starts the controller
   sp_int32 Start();
 
+  const NetworkOptions& get_serveroptions() const { return http_server_->get_serveroptions(); }
+
   // Called by the tmaster when it gets response form ckptmgr
   void HandleCleanStatefulCheckpointResponse(proto::system::StatusCode _status);
 
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp
index 6a17393642..aa60fe1c03 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -84,8 +84,6 @@ TMaster::TMaster(const std::string& _zk_hostport, const std::string& _topology_n
 
   mMetricsMgrClient = new heron::common::MetricsMgrSt(
       mMetricsMgrPort, metricsExportIntervalSec, eventLoop_);
-  mMetricsMgrClient->Start(myhost_name_, master_port_, "__tmaster__",
-                           "0");  // MM expects task_id, so just giving 0 for tmaster.
 
   tmasterProcessMetrics = new heron::common::MultiAssignableMetric();
   mMetricsMgrClient->register_metric(METRIC_PREFIX, tmasterProcessMetrics);
@@ -106,6 +104,12 @@ TMaster::TMaster(const std::string& _zk_hostport, const std::string& _topology_n
   do_reassign_ = false;
 
   master_establish_attempts_ = 0;
+
+  // start servers to get the actual ports before `tmaster_location_` related operation
+  StartServers();
+  mMetricsMgrClient->Start(myhost_name_, master_port_, "__tmaster__",
+                           "0");  // MM expects task_id, so just giving 0 for tmaster.
+
   tmaster_location_ = new proto::tmaster::TMasterLocation();
   tmaster_location_->set_topology_name(_topology_name);
   tmaster_location_->set_topology_id(_topology_id);
@@ -202,6 +206,68 @@ void TMaster::OnPackingPlanFetch(proto::system::PackingPlan* newPackingPlan,
   }
 }
 
+const NetworkOptions& TMaster::getMasterOptions() const {
+  return master_->get_serveroptions();
+}
+const NetworkOptions& TMaster::getControllerOptions() const {
+  return tmaster_controller_->get_serveroptions();
+}
+const NetworkOptions& TMaster::getStatsOptions() const {
+  return stats_->get_serveroptions();
+}
+
+void TMaster::StartServers() {
+  NetworkOptions master_options;
+  master_options.set_host(myhost_name_);
+  master_options.set_port(master_port_);
+  master_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
+                                         ->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
+                                     1_MB);
+  master_options.set_socket_family(PF_INET);
+  master_ = new TMasterServer(eventLoop_, master_options, metrics_collector_, this);
+
+  sp_int32 retval = master_->Start();
+  if (retval != SP_OK) {
+    LOG(FATAL) << "Failed to start TMaster Master Server with rcode: " << retval;
+  } else {
+    master_port_ = master_->get_serveroptions().get_port();
+  }
+
+  // Port for the scheduler to connect to
+  NetworkOptions controller_options;
+  controller_options.set_host(myhost_name_);
+  controller_options.set_port(tmaster_controller_port_);
+  controller_options.set_max_packet_size(
+      config::HeronInternalsConfigReader::Instance()
+          ->GetHeronTmasterNetworkControllerOptionsMaximumPacketMb() *
+      1_MB);
+  controller_options.set_socket_family(PF_INET);
+  tmaster_controller_ = new TController(eventLoop_, controller_options, this);
+
+  retval = tmaster_controller_->Start();
+  if (retval != SP_OK) {
+    LOG(FATAL) << "Failed to start TMaster Controller Server with rcode: " << retval;
+  } else {
+    tmaster_controller_port_ =   tmaster_controller_->get_serveroptions().get_port();
+  }
+
+  // Http port for stat queries
+  NetworkOptions stats_options;
+  if (config::HeronInternalsConfigReader::Instance()
+          ->GetHeronTmasterMetricsNetworkBindAllInterfaces()) {
+    stats_options.set_host("0.0.0.0");
+  } else {
+    stats_options.set_host(myhost_name_);
+  }
+  stats_options.set_port(stats_port_);
+  stats_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
+                                        ->GetHeronTmasterNetworkStatsOptionsMaximumPacketMb() *
+                                    1_MB);
+  stats_options.set_socket_family(PF_INET);
+  stats_ = new StatsInterface(eventLoop_, stats_options, metrics_collector_, this);
+  stats_port_ = stats_->get_serveroptions().get_port();
+}
+
 void TMaster::EstablishTMaster(EventLoop::Status) {
   auto cb = [this](proto::system::StatusCode code) { this->SetTMasterLocationDone(code); };
 
@@ -499,53 +565,6 @@ void TMaster::GetPhysicalPlanDone(proto::system::PhysicalPlan* _pplan,
       stateful_controller_->RegisterNewPhysicalPlan(*current_pplan_);
     }
   }
-
-  // Now that we have our state all setup, its time to start accepting requests
-  // Port for the stmgrs to connect to
-  NetworkOptions master_options;
-  master_options.set_host(myhost_name_);
-  master_options.set_port(master_port_);
-  master_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
-                                         ->GetHeronTmasterNetworkMasterOptionsMaximumPacketMb() *
-                                     1_MB);
-  master_options.set_socket_family(PF_INET);
-  master_ = new TMasterServer(eventLoop_, master_options, metrics_collector_, this);
-
-  sp_int32 retval = master_->Start();
-  if (retval != SP_OK) {
-    LOG(FATAL) << "Failed to start TMaster Master Server with rcode: " << retval;
-  }
-
-  // Port for the scheduler to connect to
-  NetworkOptions controller_options;
-  controller_options.set_host(myhost_name_);
-  controller_options.set_port(tmaster_controller_port_);
-  controller_options.set_max_packet_size(
-      config::HeronInternalsConfigReader::Instance()
-          ->GetHeronTmasterNetworkControllerOptionsMaximumPacketMb() *
-      1_MB);
-  controller_options.set_socket_family(PF_INET);
-  tmaster_controller_ = new TController(eventLoop_, controller_options, this);
-
-  retval = tmaster_controller_->Start();
-  if (retval != SP_OK) {
-    LOG(FATAL) << "Failed to start TMaster Controller Server with rcode: " << retval;
-  }
-
-  // Http port for stat queries
-  NetworkOptions stats_options;
-  if (config::HeronInternalsConfigReader::Instance()
-          ->GetHeronTmasterMetricsNetworkBindAllInterfaces()) {
-    stats_options.set_host("0.0.0.0");
-  } else {
-    stats_options.set_host(myhost_name_);
-  }
-  stats_options.set_port(stats_port_);
-  stats_options.set_max_packet_size(config::HeronInternalsConfigReader::Instance()
-                                        ->GetHeronTmasterNetworkStatsOptionsMaximumPacketMb() *
-                                    1_MB);
-  stats_options.set_socket_family(PF_INET);
-  stats_ = new StatsInterface(eventLoop_, stats_options, metrics_collector_, this);
 }
 
 void TMaster::ActivateTopology(VCallback<proto::system::StatusCode> cb) {
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmaster/src/cpp/manager/tmaster.h
index bab615e6e5..03422279c7 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmaster/src/cpp/manager/tmaster.h
@@ -84,6 +84,10 @@ class TMaster {
   // Now used in GetMetrics function in tmetrics-collector
   const proto::api::Topology* getInitialTopology() const { return topology_; }
 
+  const NetworkOptions& getMasterOptions() const;
+  const NetworkOptions& getControllerOptions() const;
+  const NetworkOptions& getStatsOptions() const;
+
   // Timer function to start the stateful checkpoint process
   void SendCheckpointMarker();
 
@@ -152,6 +156,9 @@ class TMaster {
   // Function called when we want to setup ourselves as tmaster
   void EstablishTMaster(EventLoop::Status);
 
+  // Start 3 servers: master server, controller server and stats server
+  void StartServers();
+
   void EstablishPackingPlan(EventLoop::Status);
   void FetchPackingPlan();
   void OnPackingPlanFetch(proto::system::PackingPlan* newPackingPlan,
diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
index aee840b069..0875eb2479 100644
--- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
+++ b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
@@ -258,13 +258,15 @@ void StartServer(EventLoopImpl* ss) {
 void StartTMaster(EventLoopImpl*& ss, heron::tmaster::TMaster*& tmaster,
                   std::thread*& tmaster_thread, const sp_string& zkhostportlist,
                   const sp_string& topology_name, const sp_string& topology_id,
-                  const sp_string& dpath, const sp_string& tmaster_host, sp_int32 tmaster_port,
-                  sp_int32 tmaster_controller_port, sp_int32 ckptmgr_port) {
+                  const sp_string& dpath, const sp_string& tmaster_host, sp_int32& tmaster_port,
+                  sp_int32& tmaster_controller_port, sp_int32 ckptmgr_port) {
   ss = new EventLoopImpl();
   tmaster = new heron::tmaster::TMaster(zkhostportlist, topology_name, topology_id, dpath,
-                                  tmaster_controller_port, tmaster_port, tmaster_port + 2,
+                                  tmaster_controller_port, tmaster_port, 0,
                                   tmaster_port + 3, ckptmgr_port,
                                   metrics_sinks_config_filename, LOCALHOST, ss);
+  tmaster_port = tmaster->getMasterOptions().get_port();
+  tmaster_controller_port = tmaster->getControllerOptions().get_port();
   tmaster_thread = new std::thread(StartServer, ss);
   // tmaster_thread->start();
 }
@@ -426,8 +428,8 @@ void StartStMgrs(CommonResources& common) {
 void SetUpCommonResources(CommonResources& common) {
   // Initialize dummy params
   common.tmaster_host_ = LOCALHOST;
-  common.tmaster_port_ = 53001;
-  common.tmaster_controller_port_ = 53002;
+  common.tmaster_port_ = 0;
+  common.tmaster_controller_port_ = 0;
   common.ckptmgr_port_ = 53003;
   common.stmgr_baseport_ = 53001;
   common.topology_name_ = "mytopology";
@@ -502,6 +504,8 @@ TEST(StMgr, test_pplan_distribute) {
                                     common.num_stmgrs_);
   // Start the tmaster etc.
   StartTMaster(common);
+  LOG(INFO) << "tmaster master port " << common.tmaster_port_ << std::endl;
+  LOG(INFO) << "tmaster controller port " << common.tmaster_controller_port_ << std::endl;
 
   // Distribute workers across stmgrs
   DistributeWorkersAcrossStmgrs(common);
@@ -549,6 +553,8 @@ TEST(StMgr, test_activate_deactivate) {
                                     common.num_stmgrs_);
   // Start the tmaster etc.
   StartTMaster(common);
+  LOG(INFO) << "tmaster master port " << common.tmaster_port_ << std::endl;
+  LOG(INFO) << "tmaster controller port " << common.tmaster_controller_port_ << std::endl;
 
   // Distribute workers across stmgrs
   DistributeWorkersAcrossStmgrs(common);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services