You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/04/18 00:07:45 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1120]
Reinitialize HelixManager when Helix participant check throws an
exception[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7a634b4 [GOBBLIN-1120] Reinitialize HelixManager when Helix participant check throws an exception[]
7a634b4 is described below
commit 7a634b450958e44783f236da4513008a4b8d95ec
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Apr 17 17:07:35 2020 -0700
[GOBBLIN-1120] Reinitialize HelixManager when Helix participant check throws an exception[]
Closes #2960 from
sv2000/helixAssignedParticipantCheck
---
.../cluster/HelixAssignedParticipantCheck.java | 53 +++++++++++++++-------
.../cluster/HelixAssignedParticipantCheckTest.java | 8 ++++
2 files changed, 45 insertions(+), 16 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
index 065eb6f..2102365 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
@@ -16,7 +16,6 @@
*/
package org.apache.gobblin.cluster;
-import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -32,8 +31,10 @@ import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
+import com.google.common.annotations.VisibleForTesting;
import com.typesafe.config.Config;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alias;
@@ -52,6 +53,8 @@ import org.apache.gobblin.commit.CommitStepException;
@Slf4j
@Alias (value = "HelixParticipantCheck")
public class HelixAssignedParticipantCheck implements CommitStep {
+ @Getter
+ @VisibleForTesting
private static volatile HelixManager helixManager = null;
private static volatile Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfException()
@@ -61,6 +64,7 @@ public class HelixAssignedParticipantCheck implements CommitStep {
private final String helixInstanceName;
private final String helixJob;
private final int partitionNum;
+ private final Config config;
private boolean isCompleted;
@@ -69,7 +73,7 @@ public class HelixAssignedParticipantCheck implements CommitStep {
* @param config
* @return
*/
- public static HelixManager getHelixManager(Config config) {
+ public static void initHelixManager(Config config) throws Exception {
if (helixManager == null) {
synchronized (HelixAssignedParticipantCheck.class) {
if (helixManager == null) {
@@ -77,14 +81,29 @@ public class HelixAssignedParticipantCheck implements CommitStep {
String clusterName = config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
helixManager = HelixManagerFactory.getZKHelixManager(clusterName, HelixAssignedParticipantCheck.class.getSimpleName(),
InstanceType.SPECTATOR, zkConnectString);
+ helixManager.connect();
}
}
}
- return helixManager;
}
- public HelixAssignedParticipantCheck(Config config) {
- getHelixManager(config);
+ /**
+ * Refresh {@link HelixManager} instance. Invoked when the underlying ZkClient is closed causing Helix
+ * APIs to throw an Exception.
+ * @throws Exception
+ */
+ private void refreshHelixManager() throws Exception {
+ synchronized (HelixAssignedParticipantCheck.class) {
+ //Ensure existing instance is disconnected to close any open connections.
+ helixManager.disconnect();
+ helixManager = null;
+ initHelixManager(config);
+ }
+ }
+
+ public HelixAssignedParticipantCheck(Config config) throws Exception {
+ this.config = config;
+ initHelixManager(config);
this.helixInstanceName = config.getString(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY);
this.helixJob = config.getString(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY);
this.partitionNum = config.getInt(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY);
@@ -94,8 +113,7 @@ public class HelixAssignedParticipantCheck implements CommitStep {
* Determine whether the commit step has been completed.
*/
@Override
- public boolean isCompleted()
- throws IOException {
+ public boolean isCompleted() {
return isCompleted;
}
@@ -104,20 +122,23 @@ public class HelixAssignedParticipantCheck implements CommitStep {
*/
@Override
public void execute() throws CommitStepException {
- if (!helixManager.isConnected()) {
- try {
- helixManager.connect();
- } catch (Exception e) {
- throw new CommitStepException(String.format("Helix instance %s unable to connect to Helix/ZK", helixInstanceName));
- }
- }
- TaskDriver taskDriver = new TaskDriver(helixManager);
log.info(String.format("HelixParticipantCheck step called for Helix Instance: %s, Helix job: %s, Helix partition: %d",
this.helixInstanceName, this.helixJob, this.partitionNum));
//Query Helix to get the currently assigned participant for the Helix partitionNum
Callable callable = () -> {
- JobContext jobContext = taskDriver.getJobContext(helixJob);
+ JobContext jobContext;
+ try {
+ TaskDriver taskDriver = new TaskDriver(helixManager);
+ jobContext = taskDriver.getJobContext(helixJob);
+ } catch (Exception e) {
+ log.info("Encountered exception when executing " + getClass().getSimpleName(), e);
+ log.info("Refreshing Helix manager..");
+ refreshHelixManager();
+ //Rethrow the exception to trigger a retry.
+ throw e;
+ }
+
if (jobContext != null) {
String participant = jobContext.getAssignedParticipant(partitionNum);
if (participant != null) {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
index 1aecd94..40c341e 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
@@ -84,6 +84,14 @@ public class HelixAssignedParticipantCheckTest {
// (i.e. no exceptions thrown).
check.execute();
+ //Disconnect the helixmanager used to check the assigned participant to force an Exception on the first attempt.
+ //The test should succeed on the following attempt.
+ HelixManager helixManagerOriginal = HelixAssignedParticipantCheck.getHelixManager();
+ helixManagerOriginal.disconnect();
+ check.execute();
+ //Ensure that a new HelixManager instance is created.
+ Assert.assertTrue(HelixAssignedParticipantCheck.getHelixManager() != helixManagerOriginal);
+
//Create Helix config with invalid partition num. Ensure HelixAssignedParticipantCheck fails.
helixConfig = helixConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(1));
check = new HelixAssignedParticipantCheck(helixConfig);