You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/29 22:00:27 UTC

[gobblin] branch master updated: [GOBBLIN-1575] use reference count in helix manager, so that connect/disconnect are called once and at the right time (#3427)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 89497c6  [GOBBLIN-1575] use reference count in helix manager, so that connect/disconnect are called once and at the right time (#3427)
89497c6 is described below

commit 89497c6305078da6c55f3aea4fce6642f2dd5583
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Mon Nov 29 14:00:21 2021 -0800

    [GOBBLIN-1575] use reference count in helix manager, so that connect/disconnect are called once and at the right time (#3427)
    
    * make a copy of helix manager, so that each job can keep using it without worrying about disconnect
    
    * address review comments
    
    * address review comments
    
    * use reference count to connect/disconnect only once
    
    * address review comments
---
 ...GobblinHelixDistributeJobExecutionLauncher.java | 14 ++----
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  8 ++--
 .../gobblin/cluster/GobblinHelixJobTask.java       |  4 ++
 .../cluster/GobblinHelixManagerFactory.java        | 29 ++++++++++++
 .../gobblin/cluster/GobblinHelixMultiManager.java  | 21 ++++-----
 .../GobblinReferenceCountingZkHelixManager.java    | 54 ++++++++++++++++++++++
 .../apache/gobblin/cluster/GobblinTaskRunner.java  | 10 ++--
 .../cluster/HelixRetriggeringJobCallable.java      | 14 +++---
 8 files changed, 116 insertions(+), 38 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 2e9999e..2f03225 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -110,12 +110,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
   @Getter
   private DistributeJobMonitor jobMonitor;
 
-  public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception {
-    if (builder.taskDriverHelixManager.isPresent()) {
-      this.planningJobHelixManager = builder.taskDriverHelixManager.get();
-    } else {
-      this.planningJobHelixManager = builder.jobHelixManager;
-    }
+  public GobblinHelixDistributeJobExecutionLauncher(Builder builder) {
+    this.planningJobHelixManager = builder.planningJobHelixManager;
+
     this.helixTaskDriver = new TaskDriver(this.planningJobHelixManager);
     this.sysProps = builder.sysProps;
     this.jobPlanningProps = builder.jobPlanningProps;
@@ -151,7 +148,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
           // work flow should never be deleted explicitly because it has a expiry time
           // If cancellation is requested, we should set the job state to CANCELLED/ABORT
           this.helixTaskDriver.waitToStop(planningJobId, this.helixJobStopTimeoutSeconds * 1000);
-          log.info("Stopped the workflow ", planningJobId);
+          log.info("Stopped the workflow {}", planningJobId);
         }
       } catch (HelixException e) {
         // Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop
@@ -168,8 +165,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
   public static class Builder {
     Properties sysProps;
     Properties jobPlanningProps;
-    HelixManager jobHelixManager;
-    Optional<HelixManager> taskDriverHelixManager;
+    HelixManager planningJobHelixManager;
     Path appWorkDir;
     GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
     GobblinHelixMetrics helixMetrics;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 4d48fcd..80b7423 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -17,10 +17,6 @@
 
 package org.apache.gobblin.cluster;
 
-import com.github.rholder.retry.RetryException;
-import com.github.rholder.retry.Retryer;
-import com.github.rholder.retry.RetryerBuilder;
-import com.github.rholder.retry.StopStrategies;
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
@@ -44,6 +40,10 @@ import org.apache.helix.task.TaskDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 78127b2..218d583 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Closer;
 import com.typesafe.config.Config;
 
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -143,10 +144,12 @@ class GobblinHelixJobTask implements Task {
   /**
    * Launch the actual {@link GobblinHelixJobLauncher}.
    */
+  @SneakyThrows
   @Override
   public TaskResult run() {
     log.info("Running planning job {} [{} {}]", this.planningJobId, this.applicationName, this.instanceName);
     this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig);
+    this.jobHelixManager.connect();
 
     try (Closer closer = Closer.create()) {
       Optional<String> planningIdFromStateStore = this.jobsMapping.getPlanningJobId(jobUri);
@@ -203,6 +206,7 @@ class GobblinHelixJobTask implements Task {
       return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils
           .getFullStackTrace(e));
     } finally {
+      this.jobHelixManager.disconnect();
       // always cleanup the job mapping for current job name.
       try {
         this.jobsMapping.deleteMapping(jobUri);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixManagerFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixManagerFactory.java
new file mode 100644
index 0000000..b31f363
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixManagerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import org.apache.helix.InstanceType;
+
+
+public class GobblinHelixManagerFactory {
+
+  public static GobblinReferenceCountingZkHelixManager getZKHelixManager(String clusterName, String instanceName,
+      InstanceType type, String zkAddr) {
+    return new GobblinReferenceCountingZkHelixManager(clusterName, instanceName, type, zkAddr);
+  }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index d85e1f1..83a251e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -30,7 +30,6 @@ import java.util.function.Function;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
@@ -160,30 +159,28 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
   /**
    * Build the {@link HelixManager} for the Application Master.
    */
-  protected static HelixManager buildHelixManager(Config config, String zkConnectionString, String clusterName, InstanceType type) {
+  protected static HelixManager buildHelixManager(Config config, String clusterName, InstanceType type) {
+    Preconditions.checkArgument(config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
+    String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    log.info("Using ZooKeeper connection string: " + zkConnectionString);
+
     String helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
         GobblinClusterManager.class.getSimpleName());
-    return HelixManagerFactory.getZKHelixManager(
+    return GobblinHelixManagerFactory.getZKHelixManager(
         config.getString(clusterName), helixInstanceName, type, zkConnectionString);
   }
 
   public void initialize() {
-    Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
-    String zkConnectionString = this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    log.info("Using ZooKeeper connection string: " + zkConnectionString);
-
     if (this.dedicatedManagerCluster) {
       Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY));
       log.info("We will use separate clusters to manage GobblinClusterManager and job distribution.");
       // This will create and register a Helix controller in ZooKeeper
       this.managerClusterHelixManager = buildHelixManager(this.config,
-          zkConnectionString,
           GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY,
           InstanceType.CONTROLLER);
 
       // This will create a Helix administrator to dispatch jobs to ZooKeeper
       this.jobClusterHelixManager = buildHelixManager(this.config,
-          zkConnectionString,
           GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
           InstanceType.ADMINISTRATOR);
 
@@ -195,14 +192,13 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
 
       if (this.dedicatedJobClusterController) {
         this.jobClusterController = Optional.of(GobblinHelixMultiManager
-            .buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
+            .buildHelixManager(this.config, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
                 InstanceType.CONTROLLER));
       }
 
       if (this.dedicatedTaskDriverCluster) {
         // This will create a Helix administrator to dispatch jobs to ZooKeeper
         this.taskDriverHelixManager = Optional.of(buildHelixManager(this.config,
-            zkConnectionString,
             GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
             InstanceType.ADMINISTRATOR));
 
@@ -216,7 +212,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
         // This will create a dedicated controller for planning job distribution
         if (dedicatedTaskDriverClusterController) {
           this.taskDriverClusterController = Optional.of(GobblinHelixMultiManager
-              .buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
+              .buildHelixManager(this.config, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
                   InstanceType.CONTROLLER));
         }
       }
@@ -226,7 +222,6 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
       boolean isHelixClusterManaged = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
           GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
       this.managerClusterHelixManager = buildHelixManager(this.config,
-          zkConnectionString,
           GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
           isHelixClusterManaged ? InstanceType.PARTICIPANT : InstanceType.CONTROLLER);
       this.jobClusterHelixManager = this.managerClusterHelixManager;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinReferenceCountingZkHelixManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinReferenceCountingZkHelixManager.java
new file mode 100644
index 0000000..7e34d20
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinReferenceCountingZkHelixManager.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixManager;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link ZKHelixManager} which keeps a reference count of users.
+ * Every user should call connect and disconnect to increase and decrease the count.
+ * Calls to connect and disconnect to the underlying ZKHelixManager are made only for the first and last usage respectively.
+ */
+@Slf4j
+public class GobblinReferenceCountingZkHelixManager extends ZKHelixManager {
+  private final AtomicInteger usageCount = new AtomicInteger(0);
+
+  public GobblinReferenceCountingZkHelixManager(String clusterName, String instanceName, InstanceType instanceType, String zkAddress) {
+    super(clusterName, instanceName, instanceType, zkAddress);
+  }
+
+  @Override
+  public void connect() throws Exception {
+    if (usageCount.incrementAndGet() == 1) {
+      super.connect();
+    }
+  }
+
+  @Override
+  public void disconnect() {
+    if (usageCount.decrementAndGet() <= 0) {
+      super.disconnect();
+    }
+  }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 7045c29..cad5839 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -48,7 +48,6 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
@@ -285,18 +284,18 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
 
     if (this.isTaskDriver && this.dedicatedTaskDriverCluster) {
       // This will create a Helix manager to receive the planning job
-      this.taskDriverHelixManager = Optional.of(HelixManagerFactory.getZKHelixManager(
+      this.taskDriverHelixManager = Optional.of(GobblinHelixManagerFactory.getZKHelixManager(
           ConfigUtils.getString(this.clusterConfig, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, ""),
           this.helixInstanceName,
           InstanceType.PARTICIPANT,
           zkConnectionString));
-      this.jobHelixManager = HelixManagerFactory.getZKHelixManager(
+      this.jobHelixManager = GobblinHelixManagerFactory.getZKHelixManager(
           this.clusterName,
           this.helixInstanceName,
           InstanceType.ADMINISTRATOR,
           zkConnectionString);
     } else {
-      this.jobHelixManager = HelixManagerFactory.getZKHelixManager(
+      this.jobHelixManager = GobblinHelixManagerFactory.getZKHelixManager(
           this.clusterName,
           this.helixInstanceName,
           InstanceType.PARTICIPANT,
@@ -305,8 +304,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
   }
 
   private HelixManager getReceiverManager() {
-    return taskDriverHelixManager.isPresent()?taskDriverHelixManager.get()
-        : this.jobHelixManager;
+    return taskDriverHelixManager.isPresent() ? taskDriverHelixManager.get() : this.jobHelixManager;
   }
 
   private TaskStateModelFactory createTaskStateModelFactory(Map<String, TaskFactory> taskFactoryMap) {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 3cbc7f4..735da83 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -218,6 +218,7 @@ class HelixRetriggeringJobCallable implements Callable {
    */
    private void runJobLauncherLoop() throws JobException {
     try {
+      this.jobHelixManager.connect();
       while (true) {
         currentJobLauncher = buildJobLauncherForCentralizedMode(jobScheduler, jobProps);
         // in "run once" case, job scheduler will remove current job from the scheduler
@@ -233,6 +234,7 @@ class HelixRetriggeringJobCallable implements Callable {
       log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
       throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
     } finally {
+      this.jobHelixManager.disconnect();
       currentJobLauncher = null;
     }
   }
@@ -249,8 +251,8 @@ class HelixRetriggeringJobCallable implements Callable {
     String newPlanningId;
     Closer closer = Closer.create();
     try {
-      HelixManager planningJobManager = this.taskDriverHelixManager.isPresent()?
-          this.taskDriverHelixManager.get() : this.jobHelixManager;
+      HelixManager planningJobHelixManager = this.taskDriverHelixManager.orElse(this.jobHelixManager);
+      planningJobHelixManager.connect();
 
       String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
           GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
@@ -263,7 +265,7 @@ class HelixRetriggeringJobCallable implements Callable {
       jobLock.lock();
 
       try {
-        if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobManager)) {
+        if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
           planningJobLauncherMetrics.skippedPlanningJobs.mark();
           return;
         }
@@ -284,8 +286,7 @@ class HelixRetriggeringJobCallable implements Callable {
 
         builder.setSysProps(this.sysProps);
         builder.setJobPlanningProps(jobPlanningProps);
-        builder.setJobHelixManager(this.jobHelixManager);
-        builder.setTaskDriverHelixManager(this.taskDriverHelixManager);
+        builder.setPlanningJobHelixManager(planningJobHelixManager);
         builder.setAppWorkDir(this.appWorkDir);
         builder.setJobsMapping(this.jobsMapping);
         builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
@@ -310,8 +311,9 @@ class HelixRetriggeringJobCallable implements Callable {
         // make sure the planning job is initialized (or visible) to other parallel running threads,
         // so that the same critical section check (querying Helix for job completeness)
         // can be applied.
-        HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, newPlanningId);
+        HelixUtils.waitJobInitialization(planningJobHelixManager, newPlanningId, newPlanningId);
       } finally {
+        planningJobHelixManager.disconnect();
         // end of the critical section to check if a job with same job name is running
         jobLock.unlock();
       }