You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 00:58:36 UTC

[15/50] [abbrv] hadoop git commit: YARN-6093. Minor bugs with AMRMtoken renewal and state store availability when using FederationRMFailoverProxyProvider during RM failover. (Botong Huang via Subru).

YARN-6093. Minor bugs with AMRMtoken renewal and state store availability when using FederationRMFailoverProxyProvider during RM failover. (Botong Huang via Subru).

(cherry picked from commit 66500f4fa6155d29435d7c92fd6d68079c4cab86)
(cherry picked from commit 98b45b0ed34a060e0a529069cd15676d91600dff)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ce9110ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ce9110ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ce9110ab

Branch: refs/heads/branch-2
Commit: ce9110ab5c4b94857080383aa6a01332f9bfd103
Parents: 002a77d
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Feb 22 13:16:22 2017 -0800
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 16:23:47 2017 -0700

----------------------------------------------------------------------
 .../TestFederationRMFailoverProxyProvider.java  | 69 +++++++++++++++
 .../FederationRMFailoverProxyProvider.java      | 88 +++++++++++---------
 2 files changed, 118 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce9110ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
index fa3523c..e3f9155 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
@@ -19,17 +19,21 @@ package org.apache.hadoop.yarn.client;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -44,6 +48,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Unit tests for FederationRMFailoverProxyProvider.
  */
@@ -151,4 +159,65 @@ public class TestFederationRMFailoverProxyProvider {
     }
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testUGIForProxyCreation()
+      throws IOException, InterruptedException {
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    UserGroupInformation user1 =
+        UserGroupInformation.createProxyUser("user1", currentUser);
+    UserGroupInformation user2 =
+        UserGroupInformation.createProxyUser("user2", currentUser);
+
+    final TestableFederationRMFailoverProxyProvider provider =
+        new TestableFederationRMFailoverProxyProvider();
+
+    InetSocketAddress addr =
+        conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+    final ClientRMProxy rmProxy = mock(ClientRMProxy.class);
+    when(rmProxy.getRMAddress(any(YarnConfiguration.class), any(Class.class)))
+        .thenReturn(addr);
+
+    user1.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() {
+        provider.init(conf, rmProxy, ApplicationMasterProtocol.class);
+        return null;
+      }
+    });
+
+    final ProxyInfo currentProxy = provider.getProxy();
+    Assert.assertEquals("user1", provider.getLastProxyUGI().getUserName());
+
+    user2.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() {
+        provider.performFailover(currentProxy.proxy);
+        return null;
+      }
+    });
+    Assert.assertEquals("user1", provider.getLastProxyUGI().getUserName());
+
+    provider.close();
+  }
+
+  protected static class TestableFederationRMFailoverProxyProvider<T>
+      extends FederationRMFailoverProxyProvider<T> {
+
+    private UserGroupInformation lastProxyUGI = null;
+
+    @Override
+    protected T createRMProxy(InetSocketAddress rmAddress) throws IOException {
+      lastProxyUGI = UserGroupInformation.getCurrentUser();
+      return super.createRMProxy(rmAddress);
+    }
+
+    public UserGroupInformation getLastProxyUGI() {
+      return lastProxyUGI;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce9110ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
index 1915f67..e00f8d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.federation.failover;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -29,14 +29,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -44,6 +42,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -64,7 +63,7 @@ public class FederationRMFailoverProxyProvider<T>
   private YarnConfiguration conf;
   private FederationStateStoreFacade facade;
   private SubClusterId subClusterId;
-  private Collection<Token<? extends TokenIdentifier>> originalTokens;
+  private UserGroupInformation originalUser;
   private boolean federationFailoverEnabled = false;
 
   @Override
@@ -97,59 +96,67 @@ public class FederationRMFailoverProxyProvider<T>
             YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
 
     try {
-      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-      originalTokens = currentUser.getTokens();
+      this.originalUser = UserGroupInformation.getCurrentUser();
       LOG.info("Initialized Federation proxy for user: {}",
-          currentUser.getUserName());
+          this.originalUser.getUserName());
     } catch (IOException e) {
       LOG.warn("Could not get information of requester, ignoring for now.");
+      this.originalUser = null;
     }
 
   }
 
-  private void addOriginalTokens(UserGroupInformation currentUser) {
-    if (originalTokens == null || originalTokens.isEmpty()) {
-      return;
-    }
-    for (Token<? extends TokenIdentifier> token : originalTokens) {
-      currentUser.addToken(token);
-    }
+  @VisibleForTesting
+  protected T createRMProxy(InetSocketAddress rmAddress) throws IOException {
+    return rmProxy.getProxy(conf, protocol, rmAddress);
   }
 
   private T getProxyInternal(boolean isFailover) {
     SubClusterInfo subClusterInfo;
-    UserGroupInformation currentUser = null;
+    // Use the existing proxy as a backup in case getting the new proxy fails.
+    // Note that if the first time it fails, the backup is also null. In that
+    // case we will hit NullPointerException and throw it back to AM.
+    T proxy = this.current;
     try {
       LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
           subClusterId);
       subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
       // updating the conf with the refreshed RM addresses as proxy
-      // creations
-      // are based out of conf
+      // creations are based out of conf
       updateRMAddress(subClusterInfo);
-      currentUser = UserGroupInformation.getCurrentUser();
-      addOriginalTokens(currentUser);
-    } catch (YarnException e) {
+      if (this.originalUser == null) {
+        InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+        LOG.info(
+            "Connecting to {} subClusterId {} with protocol {}"
+                + " without a proxy user",
+            rmAddress, subClusterId, protocol.getSimpleName());
+        proxy = createRMProxy(rmAddress);
+      } else {
+        // If the original ugi exists, always use that to create proxy because
+        // it contains up-to-date AMRMToken
+        proxy = this.originalUser.doAs(new PrivilegedExceptionAction<T>() {
+          @Override
+          public T run() throws IOException {
+            InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+            LOG.info(
+                "Connecting to {} subClusterId {} with protocol {} as user {}",
+                rmAddress, subClusterId, protocol.getSimpleName(),
+                originalUser);
+            return createRMProxy(rmAddress);
+          }
+        });
+      }
+    } catch (Exception e) {
       LOG.error("Exception while trying to create proxy to the ResourceManager"
           + " for SubClusterId: {}", subClusterId, e);
-      return null;
-    } catch (IOException e) {
-      LOG.warn("Could not get information of requester, ignoring for now.");
-    }
-    try {
-      final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
-      LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
-          protocol.getSimpleName(), currentUser);
-      LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
-          subClusterId);
-      return rmProxy.getProxy(conf, protocol, rmAddress);
-    } catch (IOException ioe) {
-      LOG.error(
-          "IOException while trying to create proxy to the ResourceManager"
-              + " for SubClusterId: {}",
-          subClusterId, ioe);
-      return null;
+      if (proxy == null) {
+        throw new YarnRuntimeException(
+            String.format("Create initial proxy to the ResourceManager for"
+                + " SubClusterId %s failed", subClusterId),
+            e);
+      }
     }
+    return proxy;
   }
 
   private void updateRMAddress(SubClusterInfo subClusterInfo) {
@@ -177,8 +184,11 @@ public class FederationRMFailoverProxyProvider<T>
 
   @Override
   public synchronized void performFailover(T currentProxy) {
-    closeInternal(currentProxy);
+    // It will not return null proxy here
     current = getProxyInternal(federationFailoverEnabled);
+    if (current != currentProxy) {
+      closeInternal(currentProxy);
+    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org