You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/15 13:56:36 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-737] Add support for Helix quota-based task scheduling

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cf5286  [GOBBLIN-737] Add support for Helix quota-based task scheduling
0cf5286 is described below

commit 0cf5286c97b98fd457b8acdfe886dc1c50224148
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Mon Apr 15 06:56:23 2019 -0700

    [GOBBLIN-737] Add support for Helix quota-based task scheduling
    
    Closes #2604 from htran1/helix_quota_config
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  3 ++
 .../gobblin/cluster/GobblinClusterManager.java     | 37 ++++++++++++++++++++++
 .../gobblin/cluster/GobblinClusterManagerTest.java | 17 +++++++++-
 3 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index f8604fc..8b33e4e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -96,6 +96,9 @@ public class GobblinClusterConfigurationKeys {
   // job spec operation
   public static final String JOB_ALWAYS_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.alwaysDelete";
 
+  // Job quota configuration as a comma separated list of name value pairs separated by a colon.
+  // Example: A:1,B:38,DEFAULT:1
+  public static final String HELIX_TASK_QUOTA_CONFIG_KEY = "gobblin.cluster.helixTaskQuotaConfig";
 
   /**
    * A path pointing to a directory that contains job execution files to be executed by Gobblin. This directory can
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 1722cc3..93a6712 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -41,12 +41,14 @@ import org.apache.helix.Criteria;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.eventbus.EventBus;
@@ -219,6 +221,39 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
   }
 
   /**
+   * Configure Helix quota-based task scheduling
+   */
+  @VisibleForTesting
+  void configureHelixQuotaBasedTaskScheduling() {
+    // set up the cluster quota config
+    List<String> quotaConfigList = ConfigUtils.getStringList(this.config,
+        GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY);
+
+    if (quotaConfigList.isEmpty()) {
+      return;
+    }
+
+    // retrieve the cluster config for updating
+    ClusterConfig clusterConfig = this.multiManager.getJobClusterHelixManager().getConfigAccessor()
+        .getClusterConfig(this.clusterName);
+    clusterConfig.resetTaskQuotaRatioMap();
+
+    for (String entry : quotaConfigList) {
+      List<String> quotaConfig = Splitter.on(":").limit(2).trimResults().omitEmptyStrings().splitToList(entry);
+
+      if (quotaConfig.size() < 2) {
+        throw new IllegalArgumentException(
+            "Quota configurations must be of the form <key1>:<value1>,<key2>:<value2>,...");
+      }
+
+      clusterConfig.setTaskQuotaRatio(quotaConfig.get(0), Integer.parseInt(quotaConfig.get(1)));
+    }
+
+    this.multiManager.getJobClusterHelixManager().getConfigAccessor()
+        .setClusterConfig(this.clusterName, clusterConfig); // Set the new ClusterConfig
+  }
+
+  /**
    * Start the Gobblin Cluster Manager.
    */
   @Override
@@ -228,6 +263,8 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
     this.eventBus.register(this);
     this.multiManager.connect();
 
+    configureHelixQuotaBasedTaskScheduling();
+
     if (this.isStandaloneMode) {
       // standalone mode starts non-daemon threads later, so need to have this thread to keep process up
       this.idleProcessThread = new Thread(new Runnable() {
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
index 9adcd67..9069c1e 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +81,8 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
     Config config = ConfigFactory.parseURL(url)
         .withValue("gobblin.cluster.zk.connection.string",
                    ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_QUOTA_CONFIG_KEY,
+            ConfigValueFactory.fromAnyRef("DEFAULT:1,OTHER:10"))
         .resolve();
 
     String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
@@ -94,7 +97,7 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
         new TestShutdownMessageHandlerFactory(this));
 
     this.gobblinClusterManager =
-        new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_APPLICATION_ID, config,
+        new GobblinClusterManager(GobblinClusterManagerTest.class.getSimpleName(), TestHelper.TEST_APPLICATION_ID, config,
             Optional.<Path>absent());
     this.gobblinClusterManager.getEventBus().register(this.gobblinClusterManager);
     this.gobblinClusterManager.connectHelixManager();
@@ -123,6 +126,18 @@ public class GobblinClusterManagerTest implements HelixMessageTestBase {
   }
 
   @Test
+  public void testQuotaConfig() throws Exception {
+    this.gobblinClusterManager.configureHelixQuotaBasedTaskScheduling();
+
+    ClusterConfig clusterConfig =
+        this.gobblinClusterManager.multiManager.getJobClusterHelixManager().getConfigAccessor()
+        .getClusterConfig(GobblinClusterManagerTest.class.getSimpleName());
+
+    Assert.assertEquals(clusterConfig.getTaskQuotaRatio("DEFAULT"), "1");
+    Assert.assertEquals(clusterConfig.getTaskQuotaRatio("OTHER"), "10");
+  }
+
+  @Test
   public void testSendShutdownRequest() throws Exception {
     Logger log = LoggerFactory.getLogger("testSendShutdownRequest");
     Closer closer = Closer.create();