You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/29 22:00:27 UTC
[gobblin] branch master updated: [GOBBLIN-1575] use reference count in helix manager, so that connect/disconnect are called once and at the right time (#3427)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 89497c6 [GOBBLIN-1575] use reference count in helix manager, so that connect/disconnect are called once and at the right time (#3427)
89497c6 is described below
commit 89497c6305078da6c55f3aea4fce6642f2dd5583
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Mon Nov 29 14:00:21 2021 -0800
[GOBBLIN-1575] use reference count in helix manager, so that connect/disconnect are called once and at the right time (#3427)
* make a copy of helix manager, so that each job can keep using it without worrying about disconnect
* address review comments
* address review comments
* use reference count to connect/disconnect only once
* address review comments
---
...GobblinHelixDistributeJobExecutionLauncher.java | 14 ++----
.../gobblin/cluster/GobblinHelixJobLauncher.java | 8 ++--
.../gobblin/cluster/GobblinHelixJobTask.java | 4 ++
.../cluster/GobblinHelixManagerFactory.java | 29 ++++++++++++
.../gobblin/cluster/GobblinHelixMultiManager.java | 21 ++++-----
.../GobblinReferenceCountingZkHelixManager.java | 54 ++++++++++++++++++++++
.../apache/gobblin/cluster/GobblinTaskRunner.java | 10 ++--
.../cluster/HelixRetriggeringJobCallable.java | 14 +++---
8 files changed, 116 insertions(+), 38 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 2e9999e..2f03225 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -110,12 +110,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
@Getter
private DistributeJobMonitor jobMonitor;
- public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception {
- if (builder.taskDriverHelixManager.isPresent()) {
- this.planningJobHelixManager = builder.taskDriverHelixManager.get();
- } else {
- this.planningJobHelixManager = builder.jobHelixManager;
- }
+ public GobblinHelixDistributeJobExecutionLauncher(Builder builder) {
+ this.planningJobHelixManager = builder.planningJobHelixManager;
+
this.helixTaskDriver = new TaskDriver(this.planningJobHelixManager);
this.sysProps = builder.sysProps;
this.jobPlanningProps = builder.jobPlanningProps;
@@ -151,7 +148,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
// work flow should never be deleted explicitly because it has a expiry time
// If cancellation is requested, we should set the job state to CANCELLED/ABORT
this.helixTaskDriver.waitToStop(planningJobId, this.helixJobStopTimeoutSeconds * 1000);
- log.info("Stopped the workflow ", planningJobId);
+ log.info("Stopped the workflow {}", planningJobId);
}
} catch (HelixException e) {
// Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop
@@ -168,8 +165,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
public static class Builder {
Properties sysProps;
Properties jobPlanningProps;
- HelixManager jobHelixManager;
- Optional<HelixManager> taskDriverHelixManager;
+ HelixManager planningJobHelixManager;
Path appWorkDir;
GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
GobblinHelixMetrics helixMetrics;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 4d48fcd..80b7423 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -17,10 +17,6 @@
package org.apache.gobblin.cluster;
-import com.github.rholder.retry.RetryException;
-import com.github.rholder.retry.Retryer;
-import com.github.rholder.retry.RetryerBuilder;
-import com.github.rholder.retry.StopStrategies;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
@@ -44,6 +40,10 @@ import org.apache.helix.task.TaskDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
index 78127b2..218d583 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java
@@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -143,10 +144,12 @@ class GobblinHelixJobTask implements Task {
/**
* Launch the actual {@link GobblinHelixJobLauncher}.
*/
+ @SneakyThrows
@Override
public TaskResult run() {
log.info("Running planning job {} [{} {}]", this.planningJobId, this.applicationName, this.instanceName);
this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig);
+ this.jobHelixManager.connect();
try (Closer closer = Closer.create()) {
Optional<String> planningIdFromStateStore = this.jobsMapping.getPlanningJobId(jobUri);
@@ -203,6 +206,7 @@ class GobblinHelixJobTask implements Task {
return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils
.getFullStackTrace(e));
} finally {
+ this.jobHelixManager.disconnect();
// always cleanup the job mapping for current job name.
try {
this.jobsMapping.deleteMapping(jobUri);
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixManagerFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixManagerFactory.java
new file mode 100644
index 0000000..b31f363
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixManagerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import org.apache.helix.InstanceType;
+
+
+public class GobblinHelixManagerFactory {
+
+ public static GobblinReferenceCountingZkHelixManager getZKHelixManager(String clusterName, String instanceName,
+ InstanceType type, String zkAddr) {
+ return new GobblinReferenceCountingZkHelixManager(clusterName, instanceName, type, zkAddr);
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
index d85e1f1..83a251e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java
@@ -30,7 +30,6 @@ import java.util.function.Function;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
@@ -160,30 +159,28 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
/**
* Build the {@link HelixManager} for the Application Master.
*/
- protected static HelixManager buildHelixManager(Config config, String zkConnectionString, String clusterName, InstanceType type) {
+ protected static HelixManager buildHelixManager(Config config, String clusterName, InstanceType type) {
+ Preconditions.checkArgument(config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
+ String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ log.info("Using ZooKeeper connection string: " + zkConnectionString);
+
String helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
GobblinClusterManager.class.getSimpleName());
- return HelixManagerFactory.getZKHelixManager(
+ return GobblinHelixManagerFactory.getZKHelixManager(
config.getString(clusterName), helixInstanceName, type, zkConnectionString);
}
public void initialize() {
- Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
- String zkConnectionString = this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
- log.info("Using ZooKeeper connection string: " + zkConnectionString);
-
if (this.dedicatedManagerCluster) {
Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY));
log.info("We will use separate clusters to manage GobblinClusterManager and job distribution.");
// This will create and register a Helix controller in ZooKeeper
this.managerClusterHelixManager = buildHelixManager(this.config,
- zkConnectionString,
GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY,
InstanceType.CONTROLLER);
// This will create a Helix administrator to dispatch jobs to ZooKeeper
this.jobClusterHelixManager = buildHelixManager(this.config,
- zkConnectionString,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
InstanceType.ADMINISTRATOR);
@@ -195,14 +192,13 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
if (this.dedicatedJobClusterController) {
this.jobClusterController = Optional.of(GobblinHelixMultiManager
- .buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
+ .buildHelixManager(this.config, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
InstanceType.CONTROLLER));
}
if (this.dedicatedTaskDriverCluster) {
// This will create a Helix administrator to dispatch jobs to ZooKeeper
this.taskDriverHelixManager = Optional.of(buildHelixManager(this.config,
- zkConnectionString,
GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
InstanceType.ADMINISTRATOR));
@@ -216,7 +212,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
// This will create a dedicated controller for planning job distribution
if (dedicatedTaskDriverClusterController) {
this.taskDriverClusterController = Optional.of(GobblinHelixMultiManager
- .buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
+ .buildHelixManager(this.config, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
InstanceType.CONTROLLER));
}
}
@@ -226,7 +222,6 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge {
boolean isHelixClusterManaged = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
this.managerClusterHelixManager = buildHelixManager(this.config,
- zkConnectionString,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
isHelixClusterManaged ? InstanceType.PARTICIPANT : InstanceType.CONTROLLER);
this.jobClusterHelixManager = this.managerClusterHelixManager;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinReferenceCountingZkHelixManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinReferenceCountingZkHelixManager.java
new file mode 100644
index 0000000..7e34d20
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinReferenceCountingZkHelixManager.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixManager;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link ZKHelixManager} which keeps a reference count of users.
+ * Every user should call connect and disconnect to increase and decrease the count.
+ * Calls to connect and disconnect to the underlying ZKHelixManager are made only for the first and last usage respectively.
+ */
+@Slf4j
+public class GobblinReferenceCountingZkHelixManager extends ZKHelixManager {
+ private final AtomicInteger usageCount = new AtomicInteger(0);
+
+ public GobblinReferenceCountingZkHelixManager(String clusterName, String instanceName, InstanceType instanceType, String zkAddress) {
+ super(clusterName, instanceName, instanceType, zkAddress);
+ }
+
+ @Override
+ public void connect() throws Exception {
+ if (usageCount.incrementAndGet() == 1) {
+ super.connect();
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ if (usageCount.decrementAndGet() <= 0) {
+ super.disconnect();
+ }
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 7045c29..cad5839 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -48,7 +48,6 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
@@ -285,18 +284,18 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
if (this.isTaskDriver && this.dedicatedTaskDriverCluster) {
// This will create a Helix manager to receive the planning job
- this.taskDriverHelixManager = Optional.of(HelixManagerFactory.getZKHelixManager(
+ this.taskDriverHelixManager = Optional.of(GobblinHelixManagerFactory.getZKHelixManager(
ConfigUtils.getString(this.clusterConfig, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, ""),
this.helixInstanceName,
InstanceType.PARTICIPANT,
zkConnectionString));
- this.jobHelixManager = HelixManagerFactory.getZKHelixManager(
+ this.jobHelixManager = GobblinHelixManagerFactory.getZKHelixManager(
this.clusterName,
this.helixInstanceName,
InstanceType.ADMINISTRATOR,
zkConnectionString);
} else {
- this.jobHelixManager = HelixManagerFactory.getZKHelixManager(
+ this.jobHelixManager = GobblinHelixManagerFactory.getZKHelixManager(
this.clusterName,
this.helixInstanceName,
InstanceType.PARTICIPANT,
@@ -305,8 +304,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
}
private HelixManager getReceiverManager() {
- return taskDriverHelixManager.isPresent()?taskDriverHelixManager.get()
- : this.jobHelixManager;
+ return taskDriverHelixManager.isPresent() ? taskDriverHelixManager.get() : this.jobHelixManager;
}
private TaskStateModelFactory createTaskStateModelFactory(Map<String, TaskFactory> taskFactoryMap) {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 3cbc7f4..735da83 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -218,6 +218,7 @@ class HelixRetriggeringJobCallable implements Callable {
*/
private void runJobLauncherLoop() throws JobException {
try {
+ this.jobHelixManager.connect();
while (true) {
currentJobLauncher = buildJobLauncherForCentralizedMode(jobScheduler, jobProps);
// in "run once" case, job scheduler will remove current job from the scheduler
@@ -233,6 +234,7 @@ class HelixRetriggeringJobCallable implements Callable {
log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
} finally {
+ this.jobHelixManager.disconnect();
currentJobLauncher = null;
}
}
@@ -249,8 +251,8 @@ class HelixRetriggeringJobCallable implements Callable {
String newPlanningId;
Closer closer = Closer.create();
try {
- HelixManager planningJobManager = this.taskDriverHelixManager.isPresent()?
- this.taskDriverHelixManager.get() : this.jobHelixManager;
+ HelixManager planningJobHelixManager = this.taskDriverHelixManager.orElse(this.jobHelixManager);
+ planningJobHelixManager.connect();
String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
@@ -263,7 +265,7 @@ class HelixRetriggeringJobCallable implements Callable {
jobLock.lock();
try {
- if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobManager)) {
+ if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
planningJobLauncherMetrics.skippedPlanningJobs.mark();
return;
}
@@ -284,8 +286,7 @@ class HelixRetriggeringJobCallable implements Callable {
builder.setSysProps(this.sysProps);
builder.setJobPlanningProps(jobPlanningProps);
- builder.setJobHelixManager(this.jobHelixManager);
- builder.setTaskDriverHelixManager(this.taskDriverHelixManager);
+ builder.setPlanningJobHelixManager(planningJobHelixManager);
builder.setAppWorkDir(this.appWorkDir);
builder.setJobsMapping(this.jobsMapping);
builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
@@ -310,8 +311,9 @@ class HelixRetriggeringJobCallable implements Callable {
// make sure the planning job is initialized (or visible) to other parallel running threads,
// so that the same critical section check (querying Helix for job completeness)
// can be applied.
- HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, newPlanningId);
+ HelixUtils.waitJobInitialization(planningJobHelixManager, newPlanningId, newPlanningId);
} finally {
+ planningJobHelixManager.disconnect();
// end of the critical section to check if a job with same job name is running
jobLock.unlock();
}