You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/01/09 21:40:00 UTC

[gobblin] branch master updated: Handle d2 markup case when not only announcing leader (#3622)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new cb2668e41 Handle d2 markup case when not only announcing leader (#3622)
cb2668e41 is described below

commit cb2668e411867c0f50d8774d7853ced73acb36b8
Author: umustafi <um...@gmail.com>
AuthorDate: Mon Jan 9 13:39:54 2023 -0800

    Handle d2 markup case when not only announcing leader (#3622)
    
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
---
 .../modules/core/GobblinServiceManager.java        | 93 ++++++++++++----------
 1 file changed, 52 insertions(+), 41 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 8845410a7..8e113909e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -297,58 +297,60 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
    * @param changeContext notification context
    */
   private void handleLeadershipChange(NotificationContext changeContext) {
-    if (this.helixManager.isPresent() && this.helixManager.get().isLeader()) {
-      LOGGER.info("Leader notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
-          this.helixManager.get().isLeader());
+    if (this.helixManager.isPresent()) {
+      if (this.helixManager.get().isLeader()) {
+        LOGGER.info("Leader notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
+            this.helixManager.get().isLeader());
 
-      if (configuration.isSchedulerEnabled()) {
-        LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler.");
-        this.scheduler.setActive(true);
-      }
+        if (configuration.isSchedulerEnabled()) {
+          LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler.");
+          this.scheduler.setActive(true);
+        }
 
-      if (helixLeaderGauges.isPresent()) {
-        helixLeaderGauges.get().setState(LeaderState.MASTER);
-      }
+        if (helixLeaderGauges.isPresent()) {
+          helixLeaderGauges.get().setState(LeaderState.MASTER);
+        }
 
-      if (configuration.isGitConfigMonitorEnabled()) {
-        this.gitConfigMonitor.setActive(true);
-      }
+        if (configuration.isGitConfigMonitorEnabled()) {
+          this.gitConfigMonitor.setActive(true);
+        }
 
-      if (configuration.isDagManagerEnabled()) {
-        //Activate DagManager only if TopologyCatalog is initialized. If not; skip activation.
-        if (this.topologyCatalog.getInitComplete().getCount() == 0) {
-          this.dagManager.setActive(true);
-          this.eventBus.register(this.dagManager);
+        if (configuration.isDagManagerEnabled()) {
+          //Activate DagManager only if TopologyCatalog is initialized. If not; skip activation.
+          if (this.topologyCatalog.getInitComplete().getCount() == 0) {
+            this.dagManager.setActive(true);
+            this.eventBus.register(this.dagManager);
+          }
         }
-      }
 
-      if (configuration.isOnlyAnnounceLeader()) {
-        this.d2Announcer.markUpServer();
-      }
-    } else if (this.helixManager.isPresent()) {
-      LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
-          this.helixManager.get().isLeader());
+        if (configuration.isOnlyAnnounceLeader()) {
+          this.d2Announcer.markUpServer();
+        }
+      } else {
+        LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
+            this.helixManager.get().isLeader());
 
-      if (configuration.isSchedulerEnabled()) {
-        LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
-        this.scheduler.setActive(false);
-      }
+        if (configuration.isSchedulerEnabled()) {
+          LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
+          this.scheduler.setActive(false);
+        }
 
-      if (helixLeaderGauges.isPresent()) {
-        helixLeaderGauges.get().setState(LeaderState.SLAVE);
-      }
+        if (helixLeaderGauges.isPresent()) {
+          helixLeaderGauges.get().setState(LeaderState.SLAVE);
+        }
 
-      if (configuration.isGitConfigMonitorEnabled()) {
-        this.gitConfigMonitor.setActive(false);
-      }
+        if (configuration.isGitConfigMonitorEnabled()) {
+          this.gitConfigMonitor.setActive(false);
+        }
 
-      if (configuration.isDagManagerEnabled()) {
-        this.dagManager.setActive(false);
-        this.eventBus.unregister(this.dagManager);
-      }
+        if (configuration.isDagManagerEnabled()) {
+          this.dagManager.setActive(false);
+          this.eventBus.unregister(this.dagManager);
+        }
 
-      if (configuration.isOnlyAnnounceLeader()) {
-        this.d2Announcer.markDownServer();
+        if (configuration.isOnlyAnnounceLeader()) {
+          this.d2Announcer.markDownServer();
+        }
       }
     }
   }
@@ -476,6 +478,11 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
       }
     }
 
+    // Announce to d2 after services are initialized regardless of leadership if configuration is not enabled
+    if (!this.configuration.isOnlyAnnounceLeader()) {
+      this.d2Announcer.markUpServer();
+    }
+
     // Populate TopologyCatalog with all Topologies generated by TopologySpecFactory
     // This has to be done after the topologyCatalog service is launched
     if (configuration.isTopologySpecFactoryEnabled()) {
@@ -515,6 +522,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     LOGGER.info("Stopping the Gobblin Service Manager");
     this.stopInProgress = true;
     try {
+      // Stop announcing GaaS instances to d2 when services are stopped
+      if (!configuration.isOnlyAnnounceLeader()) {
+        this.d2Announcer.markDownServer();
+      }
       this.serviceLauncher.stop();
     } catch (ApplicationException ae) {
       LOGGER.error("Error while stopping Gobblin Service Manager", ae);