You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/01/09 21:40:00 UTC
[gobblin] branch master updated: Handle d2 markup case when not only announcing leader (#3622)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new cb2668e41 Handle d2 markup case when not only announcing leader (#3622)
cb2668e41 is described below
commit cb2668e411867c0f50d8774d7853ced73acb36b8
Author: umustafi <um...@gmail.com>
AuthorDate: Mon Jan 9 13:39:54 2023 -0800
Handle d2 markup case when not only announcing leader (#3622)
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
---
.../modules/core/GobblinServiceManager.java | 93 ++++++++++++----------
1 file changed, 52 insertions(+), 41 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 8845410a7..8e113909e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -297,58 +297,60 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
* @param changeContext notification context
*/
private void handleLeadershipChange(NotificationContext changeContext) {
- if (this.helixManager.isPresent() && this.helixManager.get().isLeader()) {
- LOGGER.info("Leader notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
- this.helixManager.get().isLeader());
+ if (this.helixManager.isPresent()) {
+ if (this.helixManager.get().isLeader()) {
+ LOGGER.info("Leader notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
+ this.helixManager.get().isLeader());
- if (configuration.isSchedulerEnabled()) {
- LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler.");
- this.scheduler.setActive(true);
- }
+ if (configuration.isSchedulerEnabled()) {
+ LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler.");
+ this.scheduler.setActive(true);
+ }
- if (helixLeaderGauges.isPresent()) {
- helixLeaderGauges.get().setState(LeaderState.MASTER);
- }
+ if (helixLeaderGauges.isPresent()) {
+ helixLeaderGauges.get().setState(LeaderState.MASTER);
+ }
- if (configuration.isGitConfigMonitorEnabled()) {
- this.gitConfigMonitor.setActive(true);
- }
+ if (configuration.isGitConfigMonitorEnabled()) {
+ this.gitConfigMonitor.setActive(true);
+ }
- if (configuration.isDagManagerEnabled()) {
- //Activate DagManager only if TopologyCatalog is initialized. If not; skip activation.
- if (this.topologyCatalog.getInitComplete().getCount() == 0) {
- this.dagManager.setActive(true);
- this.eventBus.register(this.dagManager);
+ if (configuration.isDagManagerEnabled()) {
+ //Activate DagManager only if TopologyCatalog is initialized. If not; skip activation.
+ if (this.topologyCatalog.getInitComplete().getCount() == 0) {
+ this.dagManager.setActive(true);
+ this.eventBus.register(this.dagManager);
+ }
}
- }
- if (configuration.isOnlyAnnounceLeader()) {
- this.d2Announcer.markUpServer();
- }
- } else if (this.helixManager.isPresent()) {
- LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
- this.helixManager.get().isLeader());
+ if (configuration.isOnlyAnnounceLeader()) {
+ this.d2Announcer.markUpServer();
+ }
+ } else {
+ LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
+ this.helixManager.get().isLeader());
- if (configuration.isSchedulerEnabled()) {
- LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
- this.scheduler.setActive(false);
- }
+ if (configuration.isSchedulerEnabled()) {
+ LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
+ this.scheduler.setActive(false);
+ }
- if (helixLeaderGauges.isPresent()) {
- helixLeaderGauges.get().setState(LeaderState.SLAVE);
- }
+ if (helixLeaderGauges.isPresent()) {
+ helixLeaderGauges.get().setState(LeaderState.SLAVE);
+ }
- if (configuration.isGitConfigMonitorEnabled()) {
- this.gitConfigMonitor.setActive(false);
- }
+ if (configuration.isGitConfigMonitorEnabled()) {
+ this.gitConfigMonitor.setActive(false);
+ }
- if (configuration.isDagManagerEnabled()) {
- this.dagManager.setActive(false);
- this.eventBus.unregister(this.dagManager);
- }
+ if (configuration.isDagManagerEnabled()) {
+ this.dagManager.setActive(false);
+ this.eventBus.unregister(this.dagManager);
+ }
- if (configuration.isOnlyAnnounceLeader()) {
- this.d2Announcer.markDownServer();
+ if (configuration.isOnlyAnnounceLeader()) {
+ this.d2Announcer.markDownServer();
+ }
}
}
}
@@ -476,6 +478,11 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
}
}
+ // Announce to d2 after services are initialized regardless of leadership if configuration is not enabled
+ if (!this.configuration.isOnlyAnnounceLeader()) {
+ this.d2Announcer.markUpServer();
+ }
+
// Populate TopologyCatalog with all Topologies generated by TopologySpecFactory
// This has to be done after the topologyCatalog service is launched
if (configuration.isTopologySpecFactoryEnabled()) {
@@ -515,6 +522,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
LOGGER.info("Stopping the Gobblin Service Manager");
this.stopInProgress = true;
try {
+ // Stop announcing GaaS instances to d2 when services are stopped
+ if (!configuration.isOnlyAnnounceLeader()) {
+ this.d2Announcer.markDownServer();
+ }
this.serviceLauncher.stop();
} catch (ApplicationException ae) {
LOGGER.error("Error while stopping Gobblin Service Manager", ae);