You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/03/26 23:18:11 UTC

[GitHub] [incubator-gobblin] sv2000 opened a new pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

sv2000 opened a new pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940
 
 
   …ters
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1099
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if applicable):
   A Yarn application may leave behind orphaned containers, which can happen due to lost node managers. The orphaned containers however can continue to run (potentially forever) as participants in the Helix cluster. This can cause the following problems for a Gobblin-on-Yarn application:
   
   Double publish of data and commit of state
   Task failures and partition starvation during application restarts, as Helix may assign tasks to the orphaned containers which have a stale state and configuration
   Container failures on application restarts due to Helix instance name collisions with orphaned containers
   
   This PR incorporates the following changes to handle orphaned containers:
   1. Disables live instances during Yarn application start up and shutdown in GobblinYarnAppLauncher. This ensures that any orphaned Helix instances are disabled from joining the cluster on restart and any tasks running on these instances are cancelled.
   2. The GobblinApplicationMaster (inside YarnService) ensures that Helix instance name assigned to a newly allocated container is not a currently live instance to avoid instance name collisions. We also handle NM failures/restarts that result in Yarn RM "aborting" the container (i.e. container is deemed dead from Yarn's point of view, even though the container may physically be alive). In this case, we disable the instance to ensure it is fenced off from the Helix cluster.
   3. Gobblin workers (i.e. GobblinYarnTaskRunner) retry in case of failure in joining a Helix cluster. The retry logic disconnects from the Helix cluster and drops the instance before reattempting to join the cluster. 
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   Added unit test in GobblinTaskRunnerTest
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#issuecomment-604751963
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=h1) Report
   > Merging [#2940](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/a63461257c3fcea8f4ff67087f8cb29be25d6baf&el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `41.57%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2940      +/-   ##
   ============================================
   - Coverage     44.60%   44.60%   -0.01%     
   - Complexity     8980     8981       +1     
   ============================================
     Files          1936     1936              
     Lines         73234    73296      +62     
     Branches       8083     8089       +6     
   ============================================
   + Hits          32669    32695      +26     
   - Misses        37515    37550      +35     
   - Partials       3050     3051       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/gobblin/yarn/GobblinApplicationMaster.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbkFwcGxpY2F0aW9uTWFzdGVyLmphdmE=) | `17.80% <0.00%> (-0.25%)` | `3.00 <0.00> (ø)` | |
   | [...rg/apache/gobblin/yarn/GobblinYarnAppLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbllhcm5BcHBMYXVuY2hlci5qYXZh) | `20.66% <0.00%> (-0.51%)` | `8.00 <0.00> (ø)` | |
   | [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `34.12% <18.18%> (-4.14%)` | `14.00 <1.00> (ø)` | |
   | [...main/java/org/apache/gobblin/yarn/YarnService.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vWWFyblNlcnZpY2UuamF2YQ==) | `15.71% <22.22%> (+0.44%)` | `4.00 <0.00> (ø)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `66.01% <75.00%> (+1.85%)` | `29.00 <4.00> (+2.00)` | |
   | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `53.91% <100.00%> (ø)` | `26.00 <1.00> (ø)` | |
   | [...ava/org/apache/gobblin/fsm/FiniteStateMachine.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ZzbS9GaW5pdGVTdGF0ZU1hY2hpbmUuamF2YQ==) | `73.48% <0.00%> (-3.04%)` | `18.00% <0.00%> (-3.00%)` | |
   | [...lin/restli/throttling/ZookeeperLeaderElection.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2Utc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3Jlc3RsaS90aHJvdHRsaW5nL1pvb2tlZXBlckxlYWRlckVsZWN0aW9uLmphdmE=) | `70.00% <0.00%> (-2.23%)` | `13.00% <0.00%> (ø%)` | |
   | ... and [3 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=footer). Last update [a634612...2915df6](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] asfgit closed pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399575480
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -82,6 +91,7 @@
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
+
 
 Review comment:
   Empty line.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399583642
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
 ##########
 @@ -188,6 +193,7 @@
   // into the queue if the container running the instance completes. Unused Helix
   // instance names get picked up when replacement containers get allocated.
   private final ConcurrentLinkedQueue<String> unusedHelixInstanceNames = Queues.newConcurrentLinkedQueue();
+  private boolean reuseUnusedHelixInstanceNames = true;
 
 Review comment:
   what's the usage of this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on issue #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on issue #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#issuecomment-604735451
 
 
   @autumnust @htran1 please review.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399594383
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -366,20 +377,74 @@ boolean isStopped() {
   }
 
   @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.jobHelixManager.connect();
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-              new ParticipantShutdownMessageHandlerFactory());
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-      if (this.taskDriverHelixManager.isPresent()) {
-        this.taskDriverHelixManager.get().connect();
+  void connectHelixManager() throws Exception {
+    this.jobHelixManager.connect();
+    //Ensure the instance is enabled.
+    this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true);
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+            new ParticipantShutdownMessageHandlerFactory());
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            getUserDefinedMessageHandlerFactory());
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().connect();
+      //Ensure the instance is enabled.
+      this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true);
+    }
+  }
+
+  /**
+   * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting
+   * to re-join the cluster:
+   * <li>
+   *   <ul>Disconnect from Helix cluster, which would close any open clients</ul>
+   *   <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul>
+   *   <ul>Re-construct helix manager instances, used to re-join the cluster</ul>
+   * </li>
+   */
+  private void onClusterJoinFailure() {
+    logger.warn("Disconnecting Helix manager..");
+    disconnectHelixManager();
+
+    HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
+    //Drop the helix Instance
+    logger.warn("Dropping instance: {} from cluster: {}", helixInstanceName, clusterName);
+    HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName);
 
 Review comment:
   Changed it to return void. The dropInstanceIfExists method is intended to swallow HelixException that can only occur due to instance path not being present i.e instance does not exist. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399584405
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 ##########
 @@ -479,6 +488,26 @@ void connectHelixManager() {
     }
   }
 
+  /**
+   * A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application
+   * leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use
+   * the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the
+   * new cluster.
+   *
+   * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application
+   * completion, this method should be removed.
+   */
+  void disableLiveHelixInstances() {
+    String clusterName = this.helixManager.getClusterName();
+    HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool();
+    List<String> liveInstances = HelixUtils.getLiveInstances(this.helixManager);
+    LOGGER.warn("Found {} live instances in the cluster.", liveInstances.size());
+    for (String instanceName: liveInstances) {
+      LOGGER.warn("Disabling instance: {}", instanceName);
+      helixAdmin.enableInstance(clusterName, instanceName, false);
 
 Review comment:
   Are there any mechanism from Helix side to sort of retry-joining cluster after being disabled? Just want to make sure

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399590340
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 ##########
 @@ -479,6 +488,26 @@ void connectHelixManager() {
     }
   }
 
+  /**
+   * A method to disable pre-existing live instances in a Helix cluster. This can happen when a previous Yarn application
+   * leaves behind orphaned Yarn worker processes. Since Helix does not provide an API to drop a live instance, we use
+   * the disable instance API to fence off these orphaned instances and prevent them from becoming participants in the
+   * new cluster.
+   *
+   * NOTE: this is a workaround for an existing YARN bug. Once YARN has a fix to guarantee container kills on application
+   * completion, this method should be removed.
+   */
+  void disableLiveHelixInstances() {
+    String clusterName = this.helixManager.getClusterName();
+    HelixAdmin helixAdmin = this.helixManager.getClusterManagmentTool();
+    List<String> liveInstances = HelixUtils.getLiveInstances(this.helixManager);
+    LOGGER.warn("Found {} live instances in the cluster.", liveInstances.size());
+    for (String instanceName: liveInstances) {
+      LOGGER.warn("Disabling instance: {}", instanceName);
+      helixAdmin.enableInstance(clusterName, instanceName, false);
 
 Review comment:
   No there is none. The instance is part of the cluster, but disabled. It needs to re-enabled to start getting task assignments from Helix.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399580058
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -366,20 +377,74 @@ boolean isStopped() {
   }
 
   @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.jobHelixManager.connect();
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-              new ParticipantShutdownMessageHandlerFactory());
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-      if (this.taskDriverHelixManager.isPresent()) {
-        this.taskDriverHelixManager.get().connect();
+  void connectHelixManager() throws Exception {
+    this.jobHelixManager.connect();
+    //Ensure the instance is enabled.
+    this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true);
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+            new ParticipantShutdownMessageHandlerFactory());
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            getUserDefinedMessageHandlerFactory());
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().connect();
+      //Ensure the instance is enabled.
+      this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true);
+    }
+  }
+
+  /**
+   * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting
+   * to re-join the cluster:
+   * <li>
+   *   <ul>Disconnect from Helix cluster, which would close any open clients</ul>
+   *   <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul>
+   *   <ul>Re-construct helix manager instances, used to re-join the cluster</ul>
+   * </li>
+   */
+  private void onClusterJoinFailure() {
+    logger.warn("Disconnecting Helix manager..");
+    disconnectHelixManager();
+
+    HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
+    //Drop the helix Instance
+    logger.warn("Dropping instance: {} from cluster: {}", helixInstanceName, clusterName);
+    HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName);
 
 Review comment:
   The return value of this method is ignored, is that intentional? Or shall we leave the handling of exception to caller of this static method ? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io commented on issue #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#issuecomment-604751963
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=h1) Report
   > Merging [#2940](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/a63461257c3fcea8f4ff67087f8cb29be25d6baf&el=desc) will **decrease** coverage by `40.49%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2940       +/-   ##
   ============================================
   - Coverage     44.60%   4.11%   -40.50%     
   + Complexity     8980     750     -8230     
   ============================================
     Files          1936    1936               
     Lines         73234   73296       +62     
     Branches       8083    8089        +6     
   ============================================
   - Hits          32669    3016    -29653     
   - Misses        37515   69960    +32445     
   + Partials       3050     320     -2730     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `0.00% <0.00%> (-53.92%)` | `0.00 <0.00> (-26.00)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `0.00% <0.00%> (-64.16%)` | `0.00 <0.00> (-27.00)` | |
   | [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `0.00% <0.00%> (-38.27%)` | `0.00 <0.00> (-14.00)` | |
   | [...n/compaction/action/CompactionWatermarkAction.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vYWN0aW9uL0NvbXBhY3Rpb25XYXRlcm1hcmtBY3Rpb24uamF2YQ==) | `0.00% <0.00%> (-77.05%)` | `0.00 <0.00> (-11.00)` | |
   | [...che/gobblin/salesforce/ResultChainingIterator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvUmVzdWx0Q2hhaW5pbmdJdGVyYXRvci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/yarn/GobblinApplicationMaster.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbkFwcGxpY2F0aW9uTWFzdGVyLmphdmE=) | `0.00% <0.00%> (-18.06%)` | `0.00 <0.00> (-3.00)` | |
   | [...rg/apache/gobblin/yarn/GobblinYarnAppLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbllhcm5BcHBMYXVuY2hlci5qYXZh) | `0.00% <0.00%> (-21.17%)` | `0.00 <0.00> (-8.00)` | |
   | [...main/java/org/apache/gobblin/yarn/YarnService.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vWWFyblNlcnZpY2UuamF2YQ==) | `0.00% <0.00%> (-15.28%)` | `0.00 <0.00> (-4.00)` | |
   | [...c/main/java/org/apache/gobblin/util/FileUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvRmlsZVV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...n/java/org/apache/gobblin/fork/CopyableSchema.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ZvcmsvQ29weWFibGVTY2hlbWEuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | ... and [1150 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=footer). Last update [a634612...2915df6](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399583366
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
 ##########
 @@ -635,10 +644,26 @@ protected void handleContainerCompletion(ContainerStatus containerStatus) {
           containerStatus.getContainerId(), containerStatus.getDiagnostics()));
     }
 
-    if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
-      LOGGER.info("Container release requested, so not spawning a replacement for containerId {}",
-          containerStatus.getContainerId());
-      return;
+    if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
+      if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
+        LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
+        return;
+      } else {
+        LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
+       // Container release was not requested. Likely, the container was running on a node on which the NM died.
+       // In this case, RM assumes that the containers are "lost", even though the container process may still be
+        // running on the node. We need to ensure that the Helix instances running on the orphaned containers
+        // are fenced off from the Helix cluster to avoid double publishing and state being committed by the
+        // instances.
+        if (!UNKNOWN_HELIX_INSTANCE.equals(completedInstanceName)) {
+          String clusterName = this.helixManager.getClusterName();
+          //Disable the orphaned instance.
+          if (HelixUtils.isInstanceLive(helixManager, completedInstanceName)) {
+            LOGGER.info("Disabling the Helix instance {}", completedInstanceName);
+            this.helixManager.getClusterManagmentTool().enableInstance(clusterName, completedInstanceName, false);
 
 Review comment:
   enableInstance, This is a very good API :) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399589040
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -366,20 +377,74 @@ boolean isStopped() {
   }
 
   @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.jobHelixManager.connect();
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-              new ParticipantShutdownMessageHandlerFactory());
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-      if (this.taskDriverHelixManager.isPresent()) {
-        this.taskDriverHelixManager.get().connect();
+  void connectHelixManager() throws Exception {
+    this.jobHelixManager.connect();
+    //Ensure the instance is enabled.
+    this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true);
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+            new ParticipantShutdownMessageHandlerFactory());
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            getUserDefinedMessageHandlerFactory());
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().connect();
+      //Ensure the instance is enabled.
+      this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true);
+    }
+  }
+
+  /**
+   * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting
+   * to re-join the cluster:
+   * <li>
+   *   <ul>Disconnect from Helix cluster, which would close any open clients</ul>
+   *   <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul>
+   *   <ul>Re-construct helix manager instances, used to re-join the cluster</ul>
+   * </li>
+   */
+  private void onClusterJoinFailure() {
+    logger.warn("Disconnecting Helix manager..");
+    disconnectHelixManager();
+
+    HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
 
 Review comment:
   The intent was to avoid a state where the HelixManager is in an inconsistent state where the underlying ZkClient is connected but there is a failure later. By disconnecting before retrying, we ensure any partial state during the previous connect attempt is cleaned up. 
   
   The reason we are instantiating a new HelixAdmin instance (separate from HelixManager) is that getClusterManagementTool() requires HelixManager to be connected first. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399589999
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
 ##########
 @@ -188,6 +193,7 @@
   // into the queue if the container running the instance completes. Unused Helix
   // instance names get picked up when replacement containers get allocated.
   private final ConcurrentLinkedQueue<String> unusedHelixInstanceNames = Queues.newConcurrentLinkedQueue();
+  private boolean reuseUnusedHelixInstanceNames = true;
 
 Review comment:
   Thanks! Removed this variable.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399582595
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
 ##########
 @@ -625,7 +634,7 @@ private boolean shouldStickToTheSameNode(int containerExitStatus) {
    */
   protected void handleContainerCompletion(ContainerStatus containerStatus) {
     Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId());
-    String completedInstanceName = completedContainerEntry == null? "unknown" : completedContainerEntry.getValue();
+    String completedInstanceName = completedContainerEntry == null?  UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue();
 
 Review comment:
   Just curious, What kind of situation result in a containerId is not in containerMap? can we also add javadoc here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io edited a comment on issue #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#issuecomment-604751963
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=h1) Report
   > Merging [#2940](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/a63461257c3fcea8f4ff67087f8cb29be25d6baf&el=desc) will **decrease** coverage by `40.49%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2940       +/-   ##
   ============================================
   - Coverage     44.60%   4.10%   -40.50%     
   + Complexity     8980     750     -8230     
   ============================================
     Files          1936    1936               
     Lines         73234   73292       +58     
     Branches       8083    8088        +5     
   ============================================
   - Hits          32669    3012    -29657     
   - Misses        37515   69960    +32445     
   + Partials       3050     320     -2730     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `0.00% <0.00%> (-53.92%)` | `0.00 <0.00> (-26.00)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `0.00% <0.00%> (-64.16%)` | `0.00 <0.00> (-27.00)` | |
   | [...in/java/org/apache/gobblin/cluster/HelixUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSGVsaXhVdGlscy5qYXZh) | `0.00% <0.00%> (-38.27%)` | `0.00 <0.00> (-14.00)` | |
   | [.../apache/gobblin/yarn/GobblinApplicationMaster.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbkFwcGxpY2F0aW9uTWFzdGVyLmphdmE=) | `0.00% <0.00%> (-18.06%)` | `0.00 <0.00> (-3.00)` | |
   | [...rg/apache/gobblin/yarn/GobblinYarnAppLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbllhcm5BcHBMYXVuY2hlci5qYXZh) | `0.00% <0.00%> (-21.17%)` | `0.00 <0.00> (-8.00)` | |
   | [...main/java/org/apache/gobblin/yarn/YarnService.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vWWFyblNlcnZpY2UuamF2YQ==) | `0.00% <0.00%> (-15.28%)` | `0.00 <0.00> (-4.00)` | |
   | [...c/main/java/org/apache/gobblin/util/FileUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvRmlsZVV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...n/java/org/apache/gobblin/fork/CopyableSchema.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ZvcmsvQ29weWFibGVTY2hlbWEuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...java/org/apache/gobblin/stream/ControlMessage.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc3RyZWFtL0NvbnRyb2xNZXNzYWdlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...va/org/apache/gobblin/dataset/DatasetResolver.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YXNldC9EYXRhc2V0UmVzb2x2ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | ... and [1150 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2940/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=footer). Last update [a634612...fe7e1bf](https://codecov.io/gh/apache/incubator-gobblin/pull/2940?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399579077
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -366,20 +377,74 @@ boolean isStopped() {
   }
 
   @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.jobHelixManager.connect();
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-              new ParticipantShutdownMessageHandlerFactory());
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-      if (this.taskDriverHelixManager.isPresent()) {
-        this.taskDriverHelixManager.get().connect();
+  void connectHelixManager() throws Exception {
+    this.jobHelixManager.connect();
+    //Ensure the instance is enabled.
+    this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true);
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+            new ParticipantShutdownMessageHandlerFactory());
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            getUserDefinedMessageHandlerFactory());
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().connect();
+      //Ensure the instance is enabled.
+      this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true);
+    }
+  }
+
+  /**
+   * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting
+   * to re-join the cluster:
+   * <li>
+   *   <ul>Disconnect from Helix cluster, which would close any open clients</ul>
+   *   <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul>
+   *   <ul>Re-construct helix manager instances, used to re-join the cluster</ul>
+   * </li>
+   */
+  private void onClusterJoinFailure() {
+    logger.warn("Disconnecting Helix manager..");
+    disconnectHelixManager();
+
+    HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
+    //Drop the helix Instance
+    logger.warn("Dropping instance: {} from cluster: {}", helixInstanceName, clusterName);
+    HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName);
+
+    if (this.taskDriverHelixManager.isPresent()) {
+      String taskDriverCluster = clusterConfig.getString(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY);
+      logger.warn("Dropping instance: {} from task driver cluster: {}", helixInstanceName, taskDriverCluster);
+      HelixUtils.dropInstanceIfExists(admin, taskDriverCluster, helixInstanceName);
+    }
+    admin.close();
+
+    logger.warn("Reinitializing Helix manager..");
+    initHelixManager();
+  }
+
+  @VisibleForTesting
+  void connectHelixManagerWithRetry() {
+    Callable<Void> connectHelixManagerCallable = () -> {
+      try {
+        logger.info("Instance: {} attempting to join cluster: {}", helixInstanceName, clusterName);
+        connectHelixManager();
+      } catch (HelixException e) {
+        logger.error("Exception encountered when joining cluster", e);
+        onClusterJoinFailure();
+        throw e;
       }
-    } catch (Exception e) {
-      logger.error("HelixManager failed to connect", e);
+      return null;
+    };
+
+    Retryer<Void> retryer = RetryerBuilder.<Void>newBuilder()
+        .retryIfException()
+        .withStopStrategy(StopStrategies.stopAfterAttempt(5)).build();
+
+    try {
+      retryer.call(connectHelixManagerCallable);
+    } catch (ExecutionException | RetryException e) {
+      logger.error("Connecting to Helix manager failed", e);
 
 Review comment:
   let's avoid log.error and having an unchecked exception propagated at the same time: It doubled stack traces which is not very necessary and confused reader on how many times the exception actually happen. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399590474
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -82,6 +91,7 @@
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
+
 
 Review comment:
   Fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399590389
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -366,20 +377,74 @@ boolean isStopped() {
   }
 
   @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.jobHelixManager.connect();
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-              new ParticipantShutdownMessageHandlerFactory());
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-      if (this.taskDriverHelixManager.isPresent()) {
-        this.taskDriverHelixManager.get().connect();
+  void connectHelixManager() throws Exception {
+    this.jobHelixManager.connect();
+    //Ensure the instance is enabled.
+    this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true);
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+            new ParticipantShutdownMessageHandlerFactory());
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            getUserDefinedMessageHandlerFactory());
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().connect();
+      //Ensure the instance is enabled.
+      this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true);
+    }
+  }
+
+  /**
+   * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting
+   * to re-join the cluster:
+   * <li>
+   *   <ul>Disconnect from Helix cluster, which would close any open clients</ul>
+   *   <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul>
+   *   <ul>Re-construct helix manager instances, used to re-join the cluster</ul>
+   * </li>
+   */
+  private void onClusterJoinFailure() {
+    logger.warn("Disconnecting Helix manager..");
+    disconnectHelixManager();
+
+    HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
+    //Drop the helix Instance
+    logger.warn("Dropping instance: {} from cluster: {}", helixInstanceName, clusterName);
+    HelixUtils.dropInstanceIfExists(admin, clusterName, helixInstanceName);
+
+    if (this.taskDriverHelixManager.isPresent()) {
+      String taskDriverCluster = clusterConfig.getString(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY);
+      logger.warn("Dropping instance: {} from task driver cluster: {}", helixInstanceName, taskDriverCluster);
+      HelixUtils.dropInstanceIfExists(admin, taskDriverCluster, helixInstanceName);
+    }
+    admin.close();
+
+    logger.warn("Reinitializing Helix manager..");
+    initHelixManager();
+  }
+
+  @VisibleForTesting
+  void connectHelixManagerWithRetry() {
+    Callable<Void> connectHelixManagerCallable = () -> {
+      try {
+        logger.info("Instance: {} attempting to join cluster: {}", helixInstanceName, clusterName);
+        connectHelixManager();
+      } catch (HelixException e) {
+        logger.error("Exception encountered when joining cluster", e);
+        onClusterJoinFailure();
+        throw e;
       }
-    } catch (Exception e) {
-      logger.error("HelixManager failed to connect", e);
+      return null;
+    };
+
+    Retryer<Void> retryer = RetryerBuilder.<Void>newBuilder()
+        .retryIfException()
+        .withStopStrategy(StopStrategies.stopAfterAttempt(5)).build();
+
+    try {
+      retryer.call(connectHelixManagerCallable);
+    } catch (ExecutionException | RetryException e) {
+      logger.error("Connecting to Helix manager failed", e);
 
 Review comment:
   Thanks! Will fix it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399578539
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 ##########
 @@ -366,20 +377,74 @@ boolean isStopped() {
   }
 
   @VisibleForTesting
-  void connectHelixManager() {
-    try {
-      this.jobHelixManager.connect();
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
-              new ParticipantShutdownMessageHandlerFactory());
-      this.jobHelixManager.getMessagingService()
-          .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-              getUserDefinedMessageHandlerFactory());
-      if (this.taskDriverHelixManager.isPresent()) {
-        this.taskDriverHelixManager.get().connect();
+  void connectHelixManager() throws Exception {
+    this.jobHelixManager.connect();
+    //Ensure the instance is enabled.
+    this.jobHelixManager.getClusterManagmentTool().enableInstance(clusterName, helixInstanceName, true);
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE,
+            new ParticipantShutdownMessageHandlerFactory());
+    this.jobHelixManager.getMessagingService()
+        .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
+            getUserDefinedMessageHandlerFactory());
+    if (this.taskDriverHelixManager.isPresent()) {
+      this.taskDriverHelixManager.get().connect();
+      //Ensure the instance is enabled.
+      this.taskDriverHelixManager.get().getClusterManagmentTool().enableInstance(this.taskDriverHelixManager.get().getClusterName(), helixInstanceName, true);
+    }
+  }
+
+  /**
+   * A method to handle failures joining Helix cluster. The method will perform the following steps before attempting
+   * to re-join the cluster:
+   * <li>
+   *   <ul>Disconnect from Helix cluster, which would close any open clients</ul>
+   *   <ul>Drop instance from Helix cluster, to remove any partial instance structure from Helix</ul>
+   *   <ul>Re-construct helix manager instances, used to re-join the cluster</ul>
+   * </li>
+   */
+  private void onClusterJoinFailure() {
+    logger.warn("Disconnecting Helix manager..");
+    disconnectHelixManager();
+
+    HelixAdmin admin = new ZKHelixAdmin(clusterConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
 
 Review comment:
   I couldn't fully get the purpose of disconnect from existing helixManager, creating a HelixAdmin and use the newly-created admin to conduct instance-dropping. Isn't `helixManager.getClusterManagmentTool()` essentially a same HelixAdmin that we can use to drop instance ? What's the gain of disconnecting helixManager? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2940: GOBBLIN-1099: Handle orphaned Yarn containers in Gobblin-on-Yarn clus…
URL: https://github.com/apache/incubator-gobblin/pull/2940#discussion_r399593342
 
 

 ##########
 File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
 ##########
 @@ -625,7 +634,7 @@ private boolean shouldStickToTheSameNode(int containerExitStatus) {
    */
   protected void handleContainerCompletion(ContainerStatus containerStatus) {
     Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId());
-    String completedInstanceName = completedContainerEntry == null? "unknown" : completedContainerEntry.getValue();
+    String completedInstanceName = completedContainerEntry == null?  UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue();
 
 Review comment:
   Added a comment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services