You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/11 20:12:03 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-732] Pass UGI
credentials to the app master and load dynamic config in workers
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5be1702 [GOBBLIN-732] Pass UGI credentials to the app master and load dynamic config in workers
5be1702 is described below
commit 5be170277d537b9d3f3ad738e06fce87ce001ae0
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Thu Apr 11 13:11:54 2019 -0700
[GOBBLIN-732] Pass UGI credentials to the app master and load dynamic config in workers
Closes #2599 from htran1/azkaban_yarn
---
.../gobblin/cluster/GobblinClusterUtils.java | 26 ++++++++++++++++++++++
.../apache/gobblin/cluster/GobblinHelixTask.java | 3 ++-
.../org/apache/gobblin/cluster/SingleTask.java | 7 +++++-
.../apache/gobblin/cluster/SingleTaskRunner.java | 2 +-
.../gobblin/cluster/TaskRunnerSuiteBase.java | 4 +++-
.../gobblin/yarn/GobblinApplicationMaster.java | 5 +++--
.../gobblin/yarn/GobblinYarnAppLauncher.java | 11 +++++++++
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 5 +++--
8 files changed, 55 insertions(+), 8 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 41f926e..e414074 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -28,7 +28,9 @@ import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
@@ -103,4 +105,28 @@ public class GobblinClusterUtils {
return jobStateFilePath;
}
+
+ /**
+ * Get the dynamic config from a {@link DynamicConfigGenerator}
+ * @param config input config
+ * @return the dynamic config
+ */
+ public static Config getDynamicConfig(Config config) {
+ // load dynamic configuration and add them to the job properties
+ DynamicConfigGenerator dynamicConfigGenerator =
+ DynamicConfigGeneratorFactory.createDynamicConfigGenerator(config);
+ Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(config);
+
+ return dynamicConfig;
+ }
+
+ /**
+ * Add dynamic config with higher precedence to the input config
+ * @param config input config
+ * @return a config combining the input config with the dynamic config
+ */
+ public static Config addDynamicConfig(Config config) {
+ return getDynamicConfig(config).withFallback(config);
+ }
+
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 8e608f0..e124aca 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -94,7 +94,8 @@ public class GobblinHelixTask implements Task {
jobStateFilePath,
builder.getFs(),
taskAttemptBuilder,
- stateStores);
+ stateStores,
+ builder.getDynamicConfig());
}
private void getInfoFromTaskConfig() {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 89f2bfa..94b13d3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -55,15 +55,17 @@ public class SingleTask {
private FileSystem _fs;
private TaskAttemptBuilder _taskAttemptBuilder;
private StateStores _stateStores;
+ private Config _dynamicConfig;
SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
- TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) {
+ TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
_jobId = jobId;
_workUnitFilePath = workUnitFilePath;
_jobStateFilePath = jobStateFilePath;
_fs = fs;
_taskAttemptBuilder = taskAttemptBuilder;
_stateStores = stateStores;
+ _dynamicConfig = dynamicConfig;
}
public void run()
@@ -71,6 +73,9 @@ public class SingleTask {
List<WorkUnit> workUnits = getWorkUnits();
JobState jobState = getJobState();
+ // Add dynamic configuration to the job state
+ _dynamicConfig.entrySet().forEach(e -> jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
+
Config jobConfig = getConfigFromJobState(jobState);
_logger.debug("SingleTask.run: jobId {} workUnitFilePath {} jobStateFilePath {} jobState {} jobConfig {}",
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index 7eb87c5..04cd252 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -116,7 +116,7 @@ class SingleTaskRunner {
final TaskAttemptBuilder taskAttemptBuilder = getTaskAttemptBuilder(stateStores);
this.task = new SingleTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs,
- taskAttemptBuilder, stateStores);
+ taskAttemptBuilder, stateStores, GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
}
private TaskAttemptBuilder getTaskAttemptBuilder(final StateStores stateStores) {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 92c309a..3488785 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -82,7 +82,8 @@ public abstract class TaskRunnerSuiteBase {
@Getter
public static class Builder {
- private Config config;
+ private final Config config;
+ private final Config dynamicConfig;
private HelixManager jobHelixManager;
private Optional<ContainerMetrics> containerMetrics;
private FileSystem fs;
@@ -92,6 +93,7 @@ public abstract class TaskRunnerSuiteBase {
private String instanceName;
public Builder(Config config) {
+ this.dynamicConfig = GobblinClusterUtils.getDynamicConfig(config);
this.config = config;
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index ea60c4e..fa7ee54 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -50,6 +50,7 @@ import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
@@ -72,8 +73,8 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
public GobblinApplicationMaster(String applicationName, ContainerId containerId, Config config,
YarnConfiguration yarnConfiguration) throws Exception {
- super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(), config,
- Optional.<Path>absent());
+ super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(),
+ GobblinClusterUtils.addDynamicConfig(config), Optional.<Path>absent());
GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
if (gobblinYarnLogSource.isLogSourcePresent()) {
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 514386c..5370375 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -102,6 +102,8 @@ import org.apache.gobblin.util.logs.LogCopier;
import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
/**
* A client driver to launch Gobblin as a Yarn application.
@@ -651,6 +653,15 @@ public class GobblinYarnAppLauncher {
private void setupSecurityTokens(ContainerLaunchContext containerLaunchContext) throws IOException {
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+
+ // Pass on the credentials from the hadoop token file if present.
+ // The value in the token file takes precedence.
+ if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+ Credentials tokenFileCredentials = Credentials.readTokenStorageFile(new File(System.getenv(HADOOP_TOKEN_FILE_LOCATION)),
+ new Configuration());
+ credentials.addAll(tokenFileCredentials);
+ }
+
String tokenRenewer = this.yarnConfiguration.get(YarnConfiguration.RM_PRINCIPAL);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
throw new IOException("Failed to get master Kerberos principal for the RM to use as renewer");
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 8ee12a7..77183fd 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -47,6 +47,7 @@ import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.cluster.GobblinTaskRunner;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.util.JvmUtils;
@@ -60,8 +61,8 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
Optional<Path> appWorkDirOptional) throws Exception {
- super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId), config,
- appWorkDirOptional);
+ super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
+ GobblinClusterUtils.addDynamicConfig(config), appWorkDirOptional);
}
@Override