You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/15 13:56:36 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-737] Add
support for Helix quota-based task scheduling
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0cf5286 [GOBBLIN-737] Add support for Helix quota-based task scheduling
0cf5286 is described below
commit 0cf5286c97b98fd457b8acdfe886dc1c50224148
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Mon Apr 15 06:56:23 2019 -0700
[GOBBLIN-737] Add support for Helix quota-based task scheduling
Closes #2604 from htran1/helix_quota_config
---
.../cluster/GobblinClusterConfigurationKeys.java | 3 ++
.../gobblin/cluster/GobblinClusterManager.java | 37 ++++++++++++++++++++++
.../gobblin/cluster/GobblinClusterManagerTest.java | 17 +++++++++-
3 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index f8604fc..8b33e4e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -96,6 +96,9 @@ public class GobblinClusterConfigurationKeys {
// job spec operation
public static final String JOB_ALWAYS_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.alwaysDelete";
+ // Job quota configuration as a comma separated list of name value pairs separated by a colon.
+ // Example: A:1,B:38,DEFAULT:1
+ public static final String HELIX_TASK_QUOTA_CONFIG_KEY = "gobblin.cluster.helixTaskQuotaConfig";
/**
* A path pointing to a directory that contains job execution files to be executed by Gobblin. This directory can
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 1722cc3..93a6712 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -41,12 +41,14 @@ import org.apache.helix.Criteria;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
@@ -219,6 +221,39 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
}
/**
+ * Configure Helix quota-based task scheduling
+ */
+ @VisibleForTesting
+ void configureHelixQuotaBasedTaskScheduling() {
+ // set up the cluster quota config
+ List<String> quotaConfigList = ConfigUtils.getStringList(this.config,
+ GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY);
+
+ if (quotaConfigList.isEmpty()) {
+ return;
+ }
+
+ // retrieve the cluster config for updating
+ ClusterConfig clusterConfig = this.multiManager.getJobClusterHelixManager().getConfigAccessor()
+ .getClusterConfig(this.clusterName);
+ clusterConfig.resetTaskQuotaRatioMap();
+
+ for (String entry : quotaConfigList) {
+ List<String> quotaConfig = Splitter.on(":").limit(2).trimResults().omitEmptyStrings().splitToList(entry);
+
+ if (quotaConfig.size() < 2) {
+ throw new IllegalArgumentException(
+ "Quota configurations must be of the form <key1>:<value1>,<key2>:<value2>,...");
+ }
+
+ clusterConfig.setTaskQuotaRatio(quotaConfig.get(0), Integer.parseInt(quotaConfig.get(1)));
+ }
+
+ this.multiManager.getJobClusterHelixManager().getConfigAccessor()
+ .setClusterConfig(this.clusterName, clusterConfig); // Set the new ClusterConfig
+ }
+
+ /**
* Start the Gobblin Cluster Manager.
*/
@Override
@@ -228,6 +263,8 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
this.eventBus.register(this);
this.multiManager.connect();
+ configureHelixQuotaBasedTaskScheduling();
+
if (this.isStandaloneMode) {
// standalone mode starts non-daemon threads later, so need to have this thread to keep process up
this.idleProcessThread = new Thread(new Runnable() {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
index 9adcd67..9069c1e 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +81,8 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
Config config = ConfigFactory.parseURL(url)
.withValue("gobblin.cluster.zk.connection.string",
ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY,
+ ConfigValueFactory.fromAnyRef("DEFAULT:1,OTHER:10"))
.resolve();
String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
@@ -94,7 +97,7 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
new TestShutdownMessageHandlerFactory(this));
this.gobblinClusterManager =
- new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_APPLICATION_ID, config,
+ new GobblinClusterManager(GobblinClusterManagerTest.class.getSimpleName(), TestHelper.TEST_APPLICATION_ID, config,
Optional.<Path>absent());
this.gobblinClusterManager.getEventBus().register(this.gobblinClusterManager);
this.gobblinClusterManager.connectHelixManager();
@@ -123,6 +126,18 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
}
@Test
+ public void testQuotaConfig() throws Exception {
+ this.gobblinClusterManager.configureHelixQuotaBasedTaskScheduling();
+
+ ClusterConfig clusterConfig =
+ this.gobblinClusterManager.multiManager.getJobClusterHelixManager().getConfigAccessor()
+ .getClusterConfig(GobblinClusterManagerTest.class.getSimpleName());
+
+ Assert.assertEquals(clusterConfig.getTaskQuotaRatio("DEFAULT"), "1");
+ Assert.assertEquals(clusterConfig.getTaskQuotaRatio("OTHER"), "10");
+ }
+
+ @Test
public void testSendShutdownRequest() throws Exception {
Logger log = LoggerFactory.getLogger("testSendShutdownRequest");
Closer closer = Closer.create();