You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/18 00:07:45 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1120] Reinitialize HelixManager when Helix participant check throws an exception[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a634b4  [GOBBLIN-1120] Reinitialize HelixManager when Helix participant check throws an exception[]
7a634b4 is described below

commit 7a634b450958e44783f236da4513008a4b8d95ec
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Apr 17 17:07:35 2020 -0700

    [GOBBLIN-1120] Reinitialize HelixManager when Helix participant check throws an exception[]
    
    Closes #2960 from
    sv2000/helixAssignedParticipantCheck
---
 .../cluster/HelixAssignedParticipantCheck.java     | 53 +++++++++++++++-------
 .../cluster/HelixAssignedParticipantCheckTest.java |  8 ++++
 2 files changed, 45 insertions(+), 16 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
index 065eb6f..2102365 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
@@ -16,7 +16,6 @@
  */
 package org.apache.gobblin.cluster;
 
-import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -32,8 +31,10 @@ import com.github.rholder.retry.RetryException;
 import com.github.rholder.retry.Retryer;
 import com.github.rholder.retry.RetryerBuilder;
 import com.github.rholder.retry.StopStrategies;
+import com.google.common.annotations.VisibleForTesting;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alias;
@@ -52,6 +53,8 @@ import org.apache.gobblin.commit.CommitStepException;
 @Slf4j
 @Alias (value = "HelixParticipantCheck")
 public class HelixAssignedParticipantCheck implements CommitStep {
+  @Getter
+  @VisibleForTesting
   private static volatile HelixManager helixManager = null;
   private static volatile Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
       .retryIfException()
@@ -61,6 +64,7 @@ public class HelixAssignedParticipantCheck implements CommitStep {
   private final String helixInstanceName;
   private final String helixJob;
   private final int partitionNum;
+  private final Config config;
 
   private boolean isCompleted;
 
@@ -69,7 +73,7 @@ public class HelixAssignedParticipantCheck implements CommitStep {
    * @param config
    * @return
    */
-  public static HelixManager getHelixManager(Config config) {
+  public static void initHelixManager(Config config) throws Exception {
     if (helixManager == null) {
       synchronized (HelixAssignedParticipantCheck.class) {
         if (helixManager == null) {
@@ -77,14 +81,29 @@ public class HelixAssignedParticipantCheck implements CommitStep {
           String clusterName = config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
           helixManager = HelixManagerFactory.getZKHelixManager(clusterName, HelixAssignedParticipantCheck.class.getSimpleName(),
               InstanceType.SPECTATOR, zkConnectString);
+          helixManager.connect();
         }
       }
     }
-    return helixManager;
   }
 
-  public HelixAssignedParticipantCheck(Config config) {
-    getHelixManager(config);
+  /**
+   * Refresh {@link HelixManager} instance. Invoked when the underlying ZkClient is closed causing Helix
+   * APIs to throw an Exception.
+   * @throws Exception
+   */
+  private void refreshHelixManager() throws Exception {
+    synchronized (HelixAssignedParticipantCheck.class) {
+      //Ensure existing instance is disconnected to close any open connections.
+      helixManager.disconnect();
+      helixManager = null;
+      initHelixManager(config);
+    }
+  }
+
+  public HelixAssignedParticipantCheck(Config config) throws Exception {
+    this.config = config;
+    initHelixManager(config);
     this.helixInstanceName = config.getString(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY);
     this.helixJob = config.getString(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY);
     this.partitionNum = config.getInt(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY);
@@ -94,8 +113,7 @@ public class HelixAssignedParticipantCheck implements CommitStep {
    * Determine whether the commit step has been completed.
    */
   @Override
-  public boolean isCompleted()
-      throws IOException {
+  public boolean isCompleted() {
     return isCompleted;
   }
 
@@ -104,20 +122,23 @@ public class HelixAssignedParticipantCheck implements CommitStep {
    */
   @Override
   public void execute() throws CommitStepException {
-    if (!helixManager.isConnected()) {
-      try {
-        helixManager.connect();
-      } catch (Exception e) {
-        throw new CommitStepException(String.format("Helix instance %s unable to connect to Helix/ZK", helixInstanceName));
-      }
-    }
-    TaskDriver taskDriver = new TaskDriver(helixManager);
     log.info(String.format("HelixParticipantCheck step called for Helix Instance: %s, Helix job: %s, Helix partition: %d",
         this.helixInstanceName, this.helixJob, this.partitionNum));
 
     //Query Helix to get the currently assigned participant for the Helix partitionNum
     Callable callable = () -> {
-      JobContext jobContext = taskDriver.getJobContext(helixJob);
+      JobContext jobContext;
+      try {
+        TaskDriver taskDriver = new TaskDriver(helixManager);
+        jobContext = taskDriver.getJobContext(helixJob);
+      } catch (Exception e) {
+        log.info("Encountered exception when executing " + getClass().getSimpleName(), e);
+        log.info("Refreshing Helix manager..");
+        refreshHelixManager();
+        //Rethrow the exception to trigger a retry.
+        throw e;
+      }
+
       if (jobContext != null) {
         String participant = jobContext.getAssignedParticipant(partitionNum);
         if (participant != null) {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
index 1aecd94..40c341e 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
@@ -84,6 +84,14 @@ public class HelixAssignedParticipantCheckTest {
     // (i.e. no exceptions thrown).
     check.execute();
 
+    //Disconnect the helixmanager used to check the assigned participant to force an Exception on the first attempt.
+    //The test should succeed on the following attempt.
+    HelixManager helixManagerOriginal = HelixAssignedParticipantCheck.getHelixManager();
+    helixManagerOriginal.disconnect();
+    check.execute();
+    //Ensure that a new HelixManager instance is created.
+    Assert.assertTrue(HelixAssignedParticipantCheck.getHelixManager() != helixManagerOriginal);
+
     //Create Helix config with invalid partition num. Ensure HelixAssignedParticipantCheck fails.
     helixConfig = helixConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(1));
     check = new HelixAssignedParticipantCheck(helixConfig);