You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/08/05 23:16:26 UTC
[gobblin] branch master updated: [GOBBLIN-1500]Support gobblin on
yarn to be able to run on clusters with federated Yarn cluster enabled
(#3345)
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d96b2d2 [GOBBLIN-1500]Support gobblin on yarn to be able to run on clusters with federated Yarn cluster enabled (#3345)
d96b2d2 is described below
commit d96b2d203ea4b761e0adf91d9872de846105e8d9
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Thu Aug 5 16:16:18 2021 -0700
[GOBBLIN-1500]Support gobblin on yarn to be able to run on clusters with federated Yarn cluster enabled (#3345)
* [GOBBLIN-1500]Support gobblin on yarn to be able to run on clusters with robin enabled (WIP)
* address comments
* address comments
* Enable add dynamic property feature in azkabanGobblinYarnAppLauncher
---
.../azkaban/AzkabanGobblinYarnAppLauncher.java | 8 ++
.../gobblin/yarn/GobblinYarnAppLauncher.java | 88 ++++++++++++++--------
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 2 +
3 files changed, 68 insertions(+), 30 deletions(-)
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 0104f43..5278092 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -64,6 +64,8 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
throws IOException {
super(jobId, LOGGER);
+ addRuntimeProperties(gobblinProps);
+
Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
//Suppress logs from classes that emit Yarn application Id that Azkaban uses to kill the application.
@@ -98,6 +100,12 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
return new YarnConfiguration();
}
+ /**
+ * Extended class can override this method to add some runtime properties.
+ */
+ protected void addRuntimeProperties(Properties gobblinProps) {
+ }
+
@Override
public void run() throws Exception {
this.gobblinYarnAppLauncher.launch();
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 01cb964..c0cde51 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -45,8 +46,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -191,7 +195,6 @@ public class GobblinYarnAppLauncher {
private final HelixManager helixManager;
private final Configuration yarnConfiguration;
- private final YarnClient yarnClient;
private final FileSystem fs;
private final EventBus eventBus = new EventBus(GobblinYarnAppLauncher.class.getSimpleName());
@@ -235,6 +238,9 @@ public class GobblinYarnAppLauncher {
private final String containerTimezone;
private final String appLauncherMode;
+ private final String originalYarnRMAddress;
+ private final Map<String, YarnClient> potentialYarnClients;
+ private YarnClient yarnClient;
public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration) throws IOException {
this.config = config;
@@ -253,8 +259,16 @@ public class GobblinYarnAppLauncher {
YarnHelixUtils.setYarnClassPath(config, this.yarnConfiguration);
YarnHelixUtils.setAdditionalYarnClassPath(config, this.yarnConfiguration);
this.yarnConfiguration.set("fs.automatic.close", "false");
- this.yarnClient = YarnClient.createYarnClient();
- this.yarnClient.init(this.yarnConfiguration);
+ this.originalYarnRMAddress = this.yarnConfiguration.get(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS);
+ this.potentialYarnClients = new HashMap();
+ Set<String> potentialRMAddresses = new HashSet<>(ConfigUtils.getStringList(config, GobblinYarnConfigurationKeys.OTHER_YARN_RESOURCE_MANAGER_ADDRESSES));
+ potentialRMAddresses.add(originalYarnRMAddress);
+ for (String rmAddress : potentialRMAddresses) {
+ YarnClient tmpYarnClient = YarnClient.createYarnClient();
+ this.yarnConfiguration.set(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS, rmAddress);
+ tmpYarnClient.init(new YarnConfiguration(this.yarnConfiguration));
+ potentialYarnClients.put(rmAddress, tmpYarnClient);
+ }
this.fs = GobblinClusterUtils.buildFileSystem(config, this.yarnConfiguration);
this.closer.register(this.fs);
@@ -340,20 +354,20 @@ public class GobblinYarnAppLauncher {
connectHelixManager();
- startYarnClient();
-
- Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId();
-
- boolean isReconnected = reconnectableApplicationId.isPresent();
- // Before setup application, first login to make sure ugi has the right token.
+ //Before connect with yarn client, we need to login to get the token
if(ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
- this.securityManager = Optional.of(buildSecurityManager(isReconnected));
+ this.securityManager = Optional.of(buildSecurityManager());
this.securityManager.get().loginAndScheduleTokenRenewal();
}
- if (!reconnectableApplicationId.isPresent()) {
+ startYarnClient();
+
+ this.applicationId = getReconnectableApplicationId();
+
+ if (!this.applicationId.isPresent()) {
disableLiveHelixInstances();
LOGGER.info("No reconnectable application found so submitting a new application");
+ this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress);
this.applicationId = Optional.of(setupAndSubmitApplication());
}
@@ -549,12 +563,16 @@ public class GobblinYarnAppLauncher {
@VisibleForTesting
void startYarnClient() {
- this.yarnClient.start();
+ for (YarnClient yarnClient : potentialYarnClients.values()) {
+ yarnClient.start();
+ }
}
@VisibleForTesting
void stopYarnClient() {
- this.yarnClient.stop();
+ for (YarnClient yarnClient : potentialYarnClients.values()) {
+ yarnClient.stop();
+ }
}
/**
@@ -574,19 +592,21 @@ public class GobblinYarnAppLauncher {
@VisibleForTesting
Optional<ApplicationId> getReconnectableApplicationId() throws YarnException, IOException {
- List<ApplicationReport> applicationReports =
- this.yarnClient.getApplications(APPLICATION_TYPES, RECONNECTABLE_APPLICATION_STATES);
- if (applicationReports == null || applicationReports.isEmpty()) {
- return Optional.absent();
- }
+ for (YarnClient yarnClient: potentialYarnClients.values()) {
+ List<ApplicationReport> applicationReports = yarnClient.getApplications(APPLICATION_TYPES, RECONNECTABLE_APPLICATION_STATES);
+ if (applicationReports == null || applicationReports.isEmpty()) {
+ continue;
+ }
- // Try to find an application with a matching application name
- for (ApplicationReport applicationReport : applicationReports) {
- if (this.applicationName.equals(applicationReport.getName())) {
- String applicationId = sanitizeApplicationId(applicationReport.getApplicationId().toString());
- LOGGER.info("Found reconnectable application with application ID: " + applicationId);
- LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
- return Optional.of(applicationReport.getApplicationId());
+ // Try to find an application with a matching application name
+ for (ApplicationReport applicationReport : applicationReports) {
+ if (this.applicationName.equals(applicationReport.getName())) {
+ String applicationId = sanitizeApplicationId(applicationReport.getApplicationId().toString());
+ LOGGER.info("Found reconnectable application with application ID: " + applicationId);
+ LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
+ this.yarnClient = yarnClient;
+ return Optional.of(applicationReport.getApplicationId());
+ }
}
}
@@ -835,14 +855,22 @@ public class GobblinYarnAppLauncher {
TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName,
Optional.absent(), ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
-
+ // Only pass token here and no secrets. (since there is no simple way to remove single token/ get secrets)
+ // For RM token, only pass the RM token for the current RM, or the RM will fail to update the token
+ Credentials finalCredentials = new Credentials();
+ for (Token<? extends TokenIdentifier> token: credentials.getAllTokens()) {
+ if (token.getKind().equals(new Text("RM_DELEGATION_TOKEN")) && !token.getService().equals(new Text(this.originalYarnRMAddress))) {
+ continue;
+ }
+ finalCredentials.addToken(token.getService(), token);
+ }
Closer closer = Closer.create();
try {
DataOutputBuffer dataOutputBuffer = closer.register(new DataOutputBuffer());
- credentials.writeTokenStorageToStream(dataOutputBuffer);
+ finalCredentials.writeTokenStorageToStream(dataOutputBuffer);
ByteBuffer fsTokens = ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
containerLaunchContext.setTokens(fsTokens);
- LOGGER.info("Setting containerLaunchContext with All credential tokens: " + credentials.getAllTokens());
+ LOGGER.info("Setting containerLaunchContext with All credential tokens: " + finalCredentials.getAllTokens());
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
@@ -872,7 +900,7 @@ public class GobblinYarnAppLauncher {
return logRootDir;
}
- private AbstractYarnAppSecurityManager buildSecurityManager(boolean isReconnected) throws IOException {
+ private AbstractYarnAppSecurityManager buildSecurityManager() throws IOException {
Path tokenFilePath = new Path(this.fs.getHomeDirectory(), this.applicationName + Path.SEPARATOR +
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
@@ -881,7 +909,7 @@ public class GobblinYarnAppLauncher {
try {
return (AbstractYarnAppSecurityManager) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(
ConfigUtils.getString(config, GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config, this.helixManager, this.fs,
- tokenFilePath, isReconnected);
+ tokenFilePath);
} catch (ReflectiveOperationException e) {
throw new IOException(e);
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 956848e..fcc30d3 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -37,6 +37,8 @@ public class GobblinYarnConfigurationKeys {
public static final int DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS = 300;
public static final String APP_VIEW_ACL = GOBBLIN_YARN_PREFIX + "appViewAcl";
public static final String DEFAULT_APP_VIEW_ACL = "*";
+ public static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address";
+ public static final String OTHER_YARN_RESOURCE_MANAGER_ADDRESSES= "other.yarn.resourcemanager.addresses";
// Gobblin Yarn ApplicationMaster configuration properties.
public static final String APP_MASTER_MEMORY_MBS_KEY = GOBBLIN_YARN_PREFIX + "app.master.memory.mbs";