You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/11 20:12:03 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-732] Pass UGI credentials to the app master and load dynamic config in workers

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5be1702  [GOBBLIN-732] Pass UGI credentials to the app master and load dynamic config in workers
5be1702 is described below

commit 5be170277d537b9d3f3ad738e06fce87ce001ae0
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Thu Apr 11 13:11:54 2019 -0700

    [GOBBLIN-732] Pass UGI credentials to the app master and load dynamic config in workers
    
    Closes #2599 from htran1/azkaban_yarn
---
 .../gobblin/cluster/GobblinClusterUtils.java       | 26 ++++++++++++++++++++++
 .../apache/gobblin/cluster/GobblinHelixTask.java   |  3 ++-
 .../org/apache/gobblin/cluster/SingleTask.java     |  7 +++++-
 .../apache/gobblin/cluster/SingleTaskRunner.java   |  2 +-
 .../gobblin/cluster/TaskRunnerSuiteBase.java       |  4 +++-
 .../gobblin/yarn/GobblinApplicationMaster.java     |  5 +++--
 .../gobblin/yarn/GobblinYarnAppLauncher.java       | 11 +++++++++
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java |  5 +++--
 8 files changed, 55 insertions(+), 8 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 41f926e..e414074 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -28,7 +28,9 @@ import com.typesafe.config.Config;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.DynamicConfigGenerator;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
 
 import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
 
@@ -103,4 +105,28 @@ public class GobblinClusterUtils {
 
     return jobStateFilePath;
   }
+
+  /**
+   * Get the dynamic config from a {@link DynamicConfigGenerator}
+   * @param config input config
+   * @return  the dynamic config
+   */
+  public static Config getDynamicConfig(Config config) {
+    // load dynamic configuration and add them to the job properties
+    DynamicConfigGenerator dynamicConfigGenerator =
+        DynamicConfigGeneratorFactory.createDynamicConfigGenerator(config);
+    Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(config);
+
+    return dynamicConfig;
+  }
+
+  /**
+   * Add dynamic config with higher precedence to the input config
+   * @param config input config
+   * @return a config combining the input config with the dynamic config
+   */
+  public static Config addDynamicConfig(Config config) {
+    return getDynamicConfig(config).withFallback(config);
+  }
+
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 8e608f0..e124aca 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -94,7 +94,8 @@ public class GobblinHelixTask implements Task {
                                jobStateFilePath,
                                builder.getFs(),
                                taskAttemptBuilder,
-                               stateStores);
+                               stateStores,
+                               builder.getDynamicConfig());
   }
 
   private void getInfoFromTaskConfig() {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 89f2bfa..94b13d3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -55,15 +55,17 @@ public class SingleTask {
   private FileSystem _fs;
   private TaskAttemptBuilder _taskAttemptBuilder;
   private StateStores _stateStores;
+  private Config _dynamicConfig;
 
   SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
     _jobId = jobId;
     _workUnitFilePath = workUnitFilePath;
     _jobStateFilePath = jobStateFilePath;
     _fs = fs;
     _taskAttemptBuilder = taskAttemptBuilder;
     _stateStores = stateStores;
+    _dynamicConfig = dynamicConfig;
   }
 
   public void run()
@@ -71,6 +73,9 @@ public class SingleTask {
     List<WorkUnit> workUnits = getWorkUnits();
 
     JobState jobState = getJobState();
+    // Add dynamic configuration to the job state
+    _dynamicConfig.entrySet().forEach(e -> jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
+
     Config jobConfig = getConfigFromJobState(jobState);
 
     _logger.debug("SingleTask.run: jobId {} workUnitFilePath {} jobStateFilePath {} jobState {} jobConfig {}",
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index 7eb87c5..04cd252 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -116,7 +116,7 @@ class SingleTaskRunner {
     final TaskAttemptBuilder taskAttemptBuilder = getTaskAttemptBuilder(stateStores);
 
     this.task = new SingleTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs,
-        taskAttemptBuilder, stateStores);
+        taskAttemptBuilder, stateStores, GobblinClusterUtils.getDynamicConfig(this.clusterConfig));
   }
 
   private TaskAttemptBuilder getTaskAttemptBuilder(final StateStores stateStores) {
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 92c309a..3488785 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -82,7 +82,8 @@ public abstract class TaskRunnerSuiteBase {
 
   @Getter
   public static class Builder {
-    private Config config;
+    private final Config config;
+    private final Config dynamicConfig;
     private HelixManager jobHelixManager;
     private Optional<ContainerMetrics> containerMetrics;
     private FileSystem fs;
@@ -92,6 +93,7 @@ public abstract class TaskRunnerSuiteBase {
     private String instanceName;
 
     public Builder(Config config) {
+      this.dynamicConfig = GobblinClusterUtils.getDynamicConfig(config);
       this.config = config;
     }
 
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index ea60c4e..fa7ee54 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -50,6 +50,7 @@ import com.typesafe.config.ConfigFactory;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
 import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
@@ -72,8 +73,8 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
 
   public GobblinApplicationMaster(String applicationName, ContainerId containerId, Config config,
       YarnConfiguration yarnConfiguration) throws Exception {
-    super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(), config,
-        Optional.<Path>absent());
+    super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(),
+        GobblinClusterUtils.addDynamicConfig(config), Optional.<Path>absent());
 
     GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
     if (gobblinYarnLogSource.isLogSourcePresent()) {
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 514386c..5370375 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -102,6 +102,8 @@ import org.apache.gobblin.util.logs.LogCopier;
 import org.apache.gobblin.yarn.event.ApplicationReportArrivalEvent;
 import org.apache.gobblin.yarn.event.GetApplicationReportFailureEvent;
 
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
 
 /**
  * A client driver to launch Gobblin as a Yarn application.
@@ -651,6 +653,15 @@ public class GobblinYarnAppLauncher {
 
   private void setupSecurityTokens(ContainerLaunchContext containerLaunchContext) throws IOException {
     Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+
+    // Pass on the credentials from the hadoop token file if present.
+    // The value in the token file takes precedence.
+    if (System.getenv(HADOOP_TOKEN_FILE_LOCATION) != null) {
+      Credentials tokenFileCredentials = Credentials.readTokenStorageFile(new File(System.getenv(HADOOP_TOKEN_FILE_LOCATION)),
+          new Configuration());
+      credentials.addAll(tokenFileCredentials);
+    }
+
     String tokenRenewer = this.yarnConfiguration.get(YarnConfiguration.RM_PRINCIPAL);
     if (tokenRenewer == null || tokenRenewer.length() == 0) {
       throw new IOException("Failed to get master Kerberos principal for the RM to use as renewer");
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 8ee12a7..77183fd 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -47,6 +47,7 @@ import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.cluster.GobblinTaskRunner;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.util.JvmUtils;
@@ -60,8 +61,8 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
 
   public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
-    super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId), config,
-        appWorkDirOptional);
+    super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
+        GobblinClusterUtils.addDynamicConfig(config), appWorkDirOptional);
   }
 
   @Override