You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/01/24 22:37:29 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1032] Provide Helix instance tags config to GobblinYarnTaskRu…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 66e201c [GOBBLIN-1032] Provide Helix instance tags config to GobblinYarnTaskRu…
66e201c is described below
commit 66e201ceefad1b97fcad83b50f2954e48ef2d0f4
Author: sv2000 <su...@gmail.com>
AuthorDate: Fri Jan 24 14:37:22 2020 -0800
[GOBBLIN-1032] Provide Helix instance tags config to GobblinYarnTaskRu…
Closes #2874 from sv2000/helixInstanceTags
---
.../cluster/GobblinClusterConfigurationKeys.java | 3 +++
.../org/apache/gobblin/cluster/GobblinTaskRunner.java | 3 +++
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 12 +++++++++---
.../java/org/apache/gobblin/yarn/YarnService.java | 19 +++++++++++++------
4 files changed, 28 insertions(+), 9 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index fdf1eca..28339f4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -67,6 +67,9 @@ public class GobblinClusterConfigurationKeys {
public static final String WORK_UNIT_FILE_PATH = GOBBLIN_CLUSTER_PREFIX + "work.unit.file.path";
public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name";
public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceName";
+
+ public static final String HELIX_INSTANCE_TAGS_OPTION_NAME = "helix_instance_tags";
+
// The number of tasks that can be running concurrently in the same worker process
public static final String HELIX_CLUSTER_TASK_CONCURRENCY = GOBBLIN_CLUSTER_PREFIX + "helix.taskConcurrency";
public static final int HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT = 40;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index f3edbd1..5ada074 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
@@ -602,6 +603,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
"Application name");
options.addOption("i", GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true,
"Helix instance name");
+ options.addOption(Option.builder("t").longOpt(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
+ .hasArg(true).required(false).desc("Helix instance tags").build());
return options;
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index d3bb523..6e8c72f 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -25,7 +25,6 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -40,6 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -189,9 +189,15 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
String applicationName = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
String helixInstanceName = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME);
+ String helixInstanceTags = cmd.getOptionValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME);
+ Config config = ConfigFactory.load();
+ if (!Strings.isNullOrEmpty(helixInstanceTags)) {
+ config = config.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, ConfigValueFactory.fromAnyRef(helixInstanceTags));
+ }
+
GobblinTaskRunner gobblinTaskRunner =
- new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, ConfigFactory.load(),
+ new GobblinYarnTaskRunner(applicationName, helixInstanceName, containerId, config,
Optional.<Path>absent());
gobblinTaskRunner.start();
} catch (ParseException pe) {
@@ -199,4 +205,4 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
System.exit(1);
}
}
-}
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 4910a5f..d10d3ac 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -103,6 +103,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
import org.apache.gobblin.yarn.event.NewContainerRequest;
@@ -123,6 +124,7 @@ public class YarnService extends AbstractIdleService {
private final String applicationName;
private final String applicationId;
private final String appViewAcl;
+ private final String helixInstanceTags;
private final Config config;
@@ -225,6 +227,7 @@ public class YarnService extends AbstractIdleService {
this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
this.helixInstanceMaxRetries = config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
+ this.helixInstanceTags = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, null);
this.containerJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) :
@@ -560,7 +563,7 @@ public class YarnService extends AbstractIdleService {
@VisibleForTesting
protected String buildContainerCommand(Container container, String helixInstanceName) {
String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
- return new StringBuilder()
+ StringBuilder containerCommand = new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
.append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) -
this.jvmMemoryOverheadMbs).append("M")
@@ -572,12 +575,16 @@ public class YarnService extends AbstractIdleService {
.append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
.append(" ").append(this.applicationName)
.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
- .append(" ").append(helixInstanceName)
- .append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
+ .append(" ").append(helixInstanceName);
+
+ if (!Strings.isNullOrEmpty(this.helixInstanceTags)) {
+ containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
+ .append(" ").append(helixInstanceTags);
+ }
+ return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
- containerProcessName).append(".").append(ApplicationConstants.STDERR)
- .toString();
+ containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
}
/**