You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/03/28 00:04:14 UTC

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399589040
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -366,20 +377,74 @@ boolean isStopped() {
   }
 
   @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.jobHelixManager.connect();
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-              new ParticipantShutdownMessageHandlerFactory());
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-      if (this.taskDriverHelixManager.isPresent()) {
-        this.taskDriverHelixManager.get().connect();
+  void connectHelixManager() throws Exception {
+    this.jobHelixManager.connect();
+    //Ensure the instance is enabled.
+    this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true);
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+            new ParticipantShutdownMessageHandlerFactory());
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            getUserDefinedMessageHandlerFactory());
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().connect();
+      //Ensure the instance is enabled.
+      this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true);
+    }
+  }
+
+  /**
+   * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting
+   * to re-join the cluster:
+   * <li>
+   *   <ul>Disconnect from Helix cluster, which would close any open clients</ul>
+   *   <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul>
+   *   <ul>Re-construct helix manager instances, used to re-join the cluster</ul>
+   * </li>
+   */
+  private void onClusterJoinFailure() {
+    logger.warn("Disconnecting Helix manager..");
+    disconnectHelixManager();
+
+    HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
 
 Review comment:
   The intent was to avoid a state where the HelixManager is in an inconsistent state where the underlying ZkClient is connected but there is a failure later. By disconnecting before retrying, we ensure any partial state during the previous connect attempt is cleaned up. 
   
   The reason we are instantiating a new HelixAdmin instance (separate from HelixManager) is that getClusterManagementTool() requires HelixManager to be connected first. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services