You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/24 22:37:29 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1032] Provide Helix instance tags config to GobblinYarnTaskRu…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 66e201c  [GOBBLIN-1032] Provide Helix instance tags config to GobblinYarnTaskRu…
66e201c is described below

commit 66e201ceefad1b97fcad83b50f2954e48ef2d0f4
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Jan 24 14:37:22 2020 -0800

    [GOBBLIN-1032] Provide Helix instance tags config to GobblinYarnTaskRu…
    
    Closes #2874 from sv2000/helixInstanceTags
---
 .../cluster/GobblinClusterConfigurationKeys.java      |  3 +++
 .../org/apache/gobblin/cluster/GobblinTaskRunner.java |  3 +++
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java    | 12 +++++++++---
 .../java/org/apache/gobblin/yarn/YarnService.java     | 19 +++++++++++++------
 4 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index fdf1eca..28339f4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -67,6 +67,9 @@ public class GobblinClusterConfigurationKeys {
   public static final String WORK_UNIT_FILE_PATH = GOBBLIN_CLUSTER_PREFIX + "work.unit.file.path";
   public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name";
   public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceName";
+
+  public static final String HELIX_INSTANCE_TAGS_OPTION_NAME = "helix_instance_tags";
+
   // The number of tasks that can be running concurrently in the same worker process
   public static final String HELIX_CLUSTER_TASK_CONCURRENCY = GOBBLIN_CLUSTER_PREFIX + "helix.taskConcurrency";
   public static final int HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT = 40;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index f3edbd1..5ada074 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
@@ -602,6 +603,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
         "Application name");
     options.addOption("i", GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true,
         "Helix instance name");
+    options.addOption(Option.builder("t").longOpt(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
+        .hasArg(true).required(false).desc("Helix instance tags").build());
     return options;
   }
 
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index d3bb523..6e8c72f 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,7 +25,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -40,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Strings;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -189,9 +189,15 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
           ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
       String applicationName = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
       String helixInstanceName = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME);
+      String helixInstanceTags = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME);
 
+      Config config = ConfigFactory.load();
+      if (!Strings.isNullOrEmpty(helixInstanceTags)) {
+        config = config.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, ConfigValueFactory.fromAnyRef(helixInstanceTags));
+      }
+        
       GobblinTaskRunner gobblinTaskRunner =
-          new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, ConfigFactory.load(),
+          new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, config,
               Optional.<Path>absent());
       gobblinTaskRunner.start();
     } catch (ParseException pe) {
@@ -199,4 +205,4 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
       System.exit(1);
     }
   }
-}
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 4910a5f..d10d3ac 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -103,6 +103,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
 import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
 import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
 import org.apache.gobblin.yarn.event.NewContainerRequest;
@@ -123,6 +124,7 @@ public class YarnService extends AbstractIdleService {
   private final String applicationName;
   private final String applicationId;
   private final String appViewAcl;
+  private final String helixInstanceTags;
 
   private final Config config;
 
@@ -225,6 +227,7 @@ public class YarnService extends AbstractIdleService {
     this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
 
     this.helixInstanceMaxRetries = config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
+    this.helixInstanceTags = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, null);
 
     this.containerJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
         Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) :
@@ -560,7 +563,7 @@ public class YarnService extends AbstractIdleService {
   @VisibleForTesting
   protected String buildContainerCommand(Container container, String helixInstanceName) {
     String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
-    return new StringBuilder()
+    StringBuilder containerCommand = new StringBuilder()
         .append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
         .append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) -
             this.jvmMemoryOverheadMbs).append("M")
@@ -572,12 +575,16 @@ public class YarnService extends AbstractIdleService {
         .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
         .append(" ").append(this.applicationName)
         .append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
-        .append(" ").append(helixInstanceName)
-        .append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
+        .append(" ").append(helixInstanceName);
+
+    if (!Strings.isNullOrEmpty(this.helixInstanceTags)) {
+      containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
+          .append(" ").append(helixInstanceTags);
+    }
+    return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
           containerProcessName).append(".").append(ApplicationConstants.STDOUT)
         .append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
-          containerProcessName).append(".").append(ApplicationConstants.STDERR)
-        .toString();
+          containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
   }
 
   /**