You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/08/05 23:16:26 UTC

[gobblin] branch master updated: [GOBBLIN-1500]Support gobblin on yarn to be able to run on clusters with federated Yarn cluster enabled (#3345)

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d96b2d2  [GOBBLIN-1500]Support gobblin on yarn to be able to run on clusters with federated Yarn cluster enabled  (#3345)
d96b2d2 is described below

commit d96b2d203ea4b761e0adf91d9872de846105e8d9
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Thu Aug 5 16:16:18 2021 -0700

    [GOBBLIN-1500]Support gobblin on yarn to be able to run on clusters with federated Yarn cluster enabled  (#3345)
    
    * [GOBBLIN-1500]Support gobblin on yarn to be able to run on clusters with robin enabled (WIP)
    
    * address comments
    
    * address comments
    
    * Enable add dynamic property feature in azkabanGobblinYarnAppLauncher
---
 .../azkaban/AzkabanGobblinYarnAppLauncher.java     |  8 ++
 .../gobblin/yarn/GobblinYarnAppLauncher.java       | 88 ++++++++++++++--------
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |  2 +
 3 files changed, 68 insertions(+), 30 deletions(-)

diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 0104f43..5278092 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -64,6 +64,8 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
       throws IOException {
     super(jobId, LOGGER);
 
+    addRuntimeProperties(gobblinProps);
+
     Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
 
     //Suppress logs from classes that emit Yarn application Id that Azkaban uses to kill the application.
@@ -98,6 +100,12 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
     return new YarnConfiguration();
   }
 
+  /**
+   * Extended class can override this method to add some runtime properties.
+   */
+  protected void addRuntimeProperties(Properties gobblinProps) {
+  }
+
   @Override
   public void run() throws Exception {
     this.gobblinYarnAppLauncher.launch();
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 01cb964..c0cde51 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -45,8 +46,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -191,7 +195,6 @@ public class GobblinYarnAppLauncher {
   private final HelixManager helixManager;
 
   private final Configuration yarnConfiguration;
-  private final YarnClient yarnClient;
   private final FileSystem fs;
 
   private final EventBus eventBus = new EventBus(GobblinYarnAppLauncher.class.getSimpleName());
@@ -235,6 +238,9 @@ public class GobblinYarnAppLauncher {
 
   private final String containerTimezone;
   private final String appLauncherMode;
+  private final String originalYarnRMAddress;
+  private final Map<String, YarnClient> potentialYarnClients;
+  private YarnClient yarnClient;
 
   public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration) throws IOException {
     this.config = config;
@@ -253,8 +259,16 @@ public class GobblinYarnAppLauncher {
     YarnHelixUtils.setYarnClassPath(config, this.yarnConfiguration);
     YarnHelixUtils.setAdditionalYarnClassPath(config, this.yarnConfiguration);
     this.yarnConfiguration.set("fs.automatic.close", "false");
-    this.yarnClient = YarnClient.createYarnClient();
-    this.yarnClient.init(this.yarnConfiguration);
+    this.originalYarnRMAddress = this.yarnConfiguration.get(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS);
+    this.potentialYarnClients = new HashMap();
+    Set<String> potentialRMAddresses = new HashSet<>(ConfigUtils.getStringList(config, GobblinYarnConfigurationKeys.OTHER_YARN_RESOURCE_MANAGER_ADDRESSES));
+    potentialRMAddresses.add(originalYarnRMAddress);
+    for (String rmAddress : potentialRMAddresses) {
+      YarnClient tmpYarnClient = YarnClient.createYarnClient();
+      this.yarnConfiguration.set(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS, rmAddress);
+      tmpYarnClient.init(new YarnConfiguration(this.yarnConfiguration));
+      potentialYarnClients.put(rmAddress, tmpYarnClient);
+    }
 
     this.fs = GobblinClusterUtils.buildFileSystem(config, this.yarnConfiguration);
     this.closer.register(this.fs);
@@ -340,20 +354,20 @@ public class GobblinYarnAppLauncher {
 
     connectHelixManager();
 
-    startYarnClient();
-
-    Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId();
-
-    boolean isReconnected = reconnectableApplicationId.isPresent();
-    // Before setup application, first login to make sure ugi has the right token.
+    //Before connect with yarn client, we need to login to get the token
     if(ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
-      this.securityManager = Optional.of(buildSecurityManager(isReconnected));
+      this.securityManager = Optional.of(buildSecurityManager());
       this.securityManager.get().loginAndScheduleTokenRenewal();
     }
 
-    if (!reconnectableApplicationId.isPresent()) {
+    startYarnClient();
+
+    this.applicationId = getReconnectableApplicationId();
+
+    if (!this.applicationId.isPresent()) {
       disableLiveHelixInstances();
       LOGGER.info("No reconnectable application found so submitting a new application");
+      this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress);
       this.applicationId = Optional.of(setupAndSubmitApplication());
     }
 
@@ -549,12 +563,16 @@ public class GobblinYarnAppLauncher {
 
   @VisibleForTesting
   void startYarnClient() {
-    this.yarnClient.start();
+    for (YarnClient yarnClient : potentialYarnClients.values()) {
+      yarnClient.start();
+    }
   }
 
   @VisibleForTesting
   void stopYarnClient() {
-    this.yarnClient.stop();
+    for (YarnClient yarnClient : potentialYarnClients.values()) {
+      yarnClient.stop();
+    }
   }
 
   /**
@@ -574,19 +592,21 @@ public class GobblinYarnAppLauncher {
 
   @VisibleForTesting
   Optional<ApplicationId> getReconnectableApplicationId() throws YarnException, IOException {
-    List<ApplicationReport> applicationReports =
-        this.yarnClient.getApplications(APPLICATION_TYPES, RECONNECTABLE_APPLICATION_STATES);
-    if (applicationReports == null || applicationReports.isEmpty()) {
-      return Optional.absent();
-    }
+    for (YarnClient yarnClient: potentialYarnClients.values()) {
+      List<ApplicationReport> applicationReports = yarnClient.getApplications(APPLICATION_TYPES, RECONNECTABLE_APPLICATION_STATES);
+      if (applicationReports == null || applicationReports.isEmpty()) {
+        continue;
+      }
 
-    // Try to find an application with a matching application name
-    for (ApplicationReport applicationReport : applicationReports) {
-      if (this.applicationName.equals(applicationReport.getName())) {
-        String applicationId = sanitizeApplicationId(applicationReport.getApplicationId().toString());
-        LOGGER.info("Found reconnectable application with application ID: " + applicationId);
-        LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
-        return Optional.of(applicationReport.getApplicationId());
+      // Try to find an application with a matching application name
+      for (ApplicationReport applicationReport : applicationReports) {
+        if (this.applicationName.equals(applicationReport.getName())) {
+          String applicationId = sanitizeApplicationId(applicationReport.getApplicationId().toString());
+          LOGGER.info("Found reconnectable application with application ID: " + applicationId);
+          LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
+          this.yarnClient = yarnClient;
+          return Optional.of(applicationReport.getApplicationId());
+        }
       }
     }
 
@@ -835,14 +855,22 @@ public class GobblinYarnAppLauncher {
 
     TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName,
         Optional.absent(), ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
-
+    // Only pass token here and no secrets. (since there is no simple way to remove single token/ get secrets)
+    // For RM token, only pass the RM token for the current RM, or the RM will fail to update the token
+    Credentials finalCredentials = new Credentials();
+    for (Token<? extends TokenIdentifier> token: credentials.getAllTokens()) {
+      if (token.getKind().equals(new Text("RM_DELEGATION_TOKEN")) && !token.getService().equals(new Text(this.originalYarnRMAddress))) {
+        continue;
+      }
+      finalCredentials.addToken(token.getService(), token);
+    }
     Closer closer = Closer.create();
     try {
       DataOutputBuffer dataOutputBuffer = closer.register(new DataOutputBuffer());
-      credentials.writeTokenStorageToStream(dataOutputBuffer);
+      finalCredentials.writeTokenStorageToStream(dataOutputBuffer);
       ByteBuffer fsTokens = ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
       containerLaunchContext.setTokens(fsTokens);
-      LOGGER.info("Setting containerLaunchContext with All credential tokens: " + credentials.getAllTokens());
+      LOGGER.info("Setting containerLaunchContext with All credential tokens: " + finalCredentials.getAllTokens());
     } catch (Throwable t) {
       throw closer.rethrow(t);
     } finally {
@@ -872,7 +900,7 @@ public class GobblinYarnAppLauncher {
     return logRootDir;
   }
 
-  private AbstractYarnAppSecurityManager buildSecurityManager(boolean isReconnected) throws IOException {
+  private AbstractYarnAppSecurityManager buildSecurityManager() throws IOException {
     Path tokenFilePath = new Path(this.fs.getHomeDirectory(), this.applicationName + Path.SEPARATOR +
         GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
 
@@ -881,7 +909,7 @@ public class GobblinYarnAppLauncher {
     try {
      return (AbstractYarnAppSecurityManager) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(
           ConfigUtils.getString(config, GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config, this.helixManager, this.fs,
-          tokenFilePath, isReconnected);
+          tokenFilePath);
     } catch (ReflectiveOperationException e) {
       throw new IOException(e);
     }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 956848e..fcc30d3 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -37,6 +37,8 @@ public class GobblinYarnConfigurationKeys {
   public static final int DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS = 300;
   public static final String APP_VIEW_ACL = GOBBLIN_YARN_PREFIX + "appViewAcl";
   public static final String DEFAULT_APP_VIEW_ACL = "*";
+  public static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address";
+  public static final String OTHER_YARN_RESOURCE_MANAGER_ADDRESSES= "other.yarn.resourcemanager.addresses";
 
   // Gobblin Yarn ApplicationMaster configuration properties.
   public static final String APP_MASTER_MEMORY_MBS_KEY = GOBBLIN_YARN_PREFIX + "app.master.memory.mbs";