You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 00:58:36 UTC
[15/50] [abbrv] hadoop git commit: YARN-6093. Minor bugs with
AMRMtoken renewal and state store availability when using
FederationRMFailoverProxyProvider during RM failover. (Botong Huang via
Subru).
YARN-6093. Minor bugs with AMRMtoken renewal and state store availability when using FederationRMFailoverProxyProvider during RM failover. (Botong Huang via Subru).
(cherry picked from commit 66500f4fa6155d29435d7c92fd6d68079c4cab86)
(cherry picked from commit 98b45b0ed34a060e0a529069cd15676d91600dff)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ce9110ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ce9110ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ce9110ab
Branch: refs/heads/branch-2
Commit: ce9110ab5c4b94857080383aa6a01332f9bfd103
Parents: 002a77d
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Feb 22 13:16:22 2017 -0800
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 16:23:47 2017 -0700
----------------------------------------------------------------------
.../TestFederationRMFailoverProxyProvider.java | 69 +++++++++++++++
.../FederationRMFailoverProxyProvider.java | 88 +++++++++++---------
2 files changed, 118 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce9110ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
index fa3523c..e3f9155 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
@@ -19,17 +19,21 @@ package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -44,6 +48,10 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Unit tests for FederationRMFailoverProxyProvider.
*/
@@ -151,4 +159,65 @@ public class TestFederationRMFailoverProxyProvider {
}
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testUGIForProxyCreation()
+ throws IOException, InterruptedException {
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ UserGroupInformation user1 =
+ UserGroupInformation.createProxyUser("user1", currentUser);
+ UserGroupInformation user2 =
+ UserGroupInformation.createProxyUser("user2", currentUser);
+
+ final TestableFederationRMFailoverProxyProvider provider =
+ new TestableFederationRMFailoverProxyProvider();
+
+ InetSocketAddress addr =
+ conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+ final ClientRMProxy rmProxy = mock(ClientRMProxy.class);
+ when(rmProxy.getRMAddress(any(YarnConfiguration.class), any(Class.class)))
+ .thenReturn(addr);
+
+ user1.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() {
+ provider.init(conf, rmProxy, ApplicationMasterProtocol.class);
+ return null;
+ }
+ });
+
+ final ProxyInfo currentProxy = provider.getProxy();
+ Assert.assertEquals("user1", provider.getLastProxyUGI().getUserName());
+
+ user2.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() {
+ provider.performFailover(currentProxy.proxy);
+ return null;
+ }
+ });
+ Assert.assertEquals("user1", provider.getLastProxyUGI().getUserName());
+
+ provider.close();
+ }
+
+ protected static class TestableFederationRMFailoverProxyProvider<T>
+ extends FederationRMFailoverProxyProvider<T> {
+
+ private UserGroupInformation lastProxyUGI = null;
+
+ @Override
+ protected T createRMProxy(InetSocketAddress rmAddress) throws IOException {
+ lastProxyUGI = UserGroupInformation.getCurrentUser();
+ return super.createRMProxy(rmAddress);
+ }
+
+ public UserGroupInformation getLastProxyUGI() {
+ return lastProxyUGI;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce9110ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
index 1915f67..e00f8d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.federation.failover;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -29,14 +29,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -44,6 +42,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -64,7 +63,7 @@ public class FederationRMFailoverProxyProvider<T>
private YarnConfiguration conf;
private FederationStateStoreFacade facade;
private SubClusterId subClusterId;
- private Collection<Token<? extends TokenIdentifier>> originalTokens;
+ private UserGroupInformation originalUser;
private boolean federationFailoverEnabled = false;
@Override
@@ -97,59 +96,67 @@ public class FederationRMFailoverProxyProvider<T>
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
try {
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- originalTokens = currentUser.getTokens();
+ this.originalUser = UserGroupInformation.getCurrentUser();
LOG.info("Initialized Federation proxy for user: {}",
- currentUser.getUserName());
+ this.originalUser.getUserName());
} catch (IOException e) {
LOG.warn("Could not get information of requester, ignoring for now.");
+ this.originalUser = null;
}
}
- private void addOriginalTokens(UserGroupInformation currentUser) {
- if (originalTokens == null || originalTokens.isEmpty()) {
- return;
- }
- for (Token<? extends TokenIdentifier> token : originalTokens) {
- currentUser.addToken(token);
- }
+ @VisibleForTesting
+ protected T createRMProxy(InetSocketAddress rmAddress) throws IOException {
+ return rmProxy.getProxy(conf, protocol, rmAddress);
}
private T getProxyInternal(boolean isFailover) {
SubClusterInfo subClusterInfo;
- UserGroupInformation currentUser = null;
+ // Use the existing proxy as a backup in case getting the new proxy fails.
+ // Note that if the first time it fails, the backup is also null. In that
+ // case we will hit NullPointerException and throw it back to AM.
+ T proxy = this.current;
try {
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
subClusterId);
subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
// updating the conf with the refreshed RM addresses as proxy
- // creations
- // are based out of conf
+ // creations are based out of conf
updateRMAddress(subClusterInfo);
- currentUser = UserGroupInformation.getCurrentUser();
- addOriginalTokens(currentUser);
- } catch (YarnException e) {
+ if (this.originalUser == null) {
+ InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+ LOG.info(
+ "Connecting to {} subClusterId {} with protocol {}"
+ + " without a proxy user",
+ rmAddress, subClusterId, protocol.getSimpleName());
+ proxy = createRMProxy(rmAddress);
+ } else {
+ // If the original ugi exists, always use that to create proxy because
+ // it contains up-to-date AMRMToken
+ proxy = this.originalUser.doAs(new PrivilegedExceptionAction<T>() {
+ @Override
+ public T run() throws IOException {
+ InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+ LOG.info(
+ "Connecting to {} subClusterId {} with protocol {} as user {}",
+ rmAddress, subClusterId, protocol.getSimpleName(),
+ originalUser);
+ return createRMProxy(rmAddress);
+ }
+ });
+ }
+ } catch (Exception e) {
LOG.error("Exception while trying to create proxy to the ResourceManager"
+ " for SubClusterId: {}", subClusterId, e);
- return null;
- } catch (IOException e) {
- LOG.warn("Could not get information of requester, ignoring for now.");
- }
- try {
- final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
- LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
- protocol.getSimpleName(), currentUser);
- LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
- subClusterId);
- return rmProxy.getProxy(conf, protocol, rmAddress);
- } catch (IOException ioe) {
- LOG.error(
- "IOException while trying to create proxy to the ResourceManager"
- + " for SubClusterId: {}",
- subClusterId, ioe);
- return null;
+ if (proxy == null) {
+ throw new YarnRuntimeException(
+ String.format("Create initial proxy to the ResourceManager for"
+ + " SubClusterId %s failed", subClusterId),
+ e);
+ }
}
+ return proxy;
}
private void updateRMAddress(SubClusterInfo subClusterInfo) {
@@ -177,8 +184,11 @@ public class FederationRMFailoverProxyProvider<T>
@Override
public synchronized void performFailover(T currentProxy) {
- closeInternal(currentProxy);
+ // It will not return null proxy here
current = getProxyInternal(federationFailoverEnabled);
+ if (current != currentProxy) {
+ closeInternal(currentProxy);
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org