You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/06/14 17:04:51 UTC
[hadoop] branch HDFS-13891 updated: HDFS-14545. RBF: Router should
support GetUserMappingsProtocol. Contributed by Ayush Saxena.
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-13891 by this push:
new caa285b HDFS-14545. RBF: Router should support GetUserMappingsProtocol. Contributed by Ayush Saxena.
caa285b is described below
commit caa285b04532d58abb22f25bd5138fca5a08756c
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Fri Jun 14 10:04:38 2019 -0700
HDFS-14545. RBF: Router should support GetUserMappingsProtocol. Contributed by Ayush Saxena.
---
.../server/federation/router/ConnectionPool.java | 152 +++++++++------------
.../server/federation/router/RouterRpcServer.java | 32 +++--
.../federation/router/RouterUserProtocol.java | 104 ++++++++++++++
.../server/federation/router/TestRouterRpc.java | 10 ++
...ithRouters.java => TestRouterUserMappings.java} | 90 ++++++------
5 files changed, 254 insertions(+), 134 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index f868521..a624814 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -48,9 +50,15 @@ import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
+import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.util.Time;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
@@ -97,6 +105,32 @@ public class ConnectionPool {
/** The last time a connection was active. */
private volatile long lastActiveTime = 0;
+ /** Map for the protocols and their protobuf implementations. */
+ private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
+ static {
+ PROTO_MAP.put(ClientProtocol.class,
+ new ProtoImpl(ClientNamenodeProtocolPB.class,
+ ClientNamenodeProtocolTranslatorPB.class));
+ PROTO_MAP.put(NamenodeProtocol.class, new ProtoImpl(
+ NamenodeProtocolPB.class, NamenodeProtocolTranslatorPB.class));
+ PROTO_MAP.put(RefreshUserMappingsProtocol.class,
+ new ProtoImpl(RefreshUserMappingsProtocolPB.class,
+ RefreshUserMappingsProtocolClientSideTranslatorPB.class));
+ PROTO_MAP.put(GetUserMappingsProtocol.class,
+ new ProtoImpl(GetUserMappingsProtocolPB.class,
+ GetUserMappingsProtocolClientSideTranslatorPB.class));
+ }
+
+ /** Class to store the protocol implementation. */
+ private static class ProtoImpl {
+ private final Class<?> protoPb;
+ private final Class<?> protoClientPb;
+
+ ProtoImpl(Class<?> pPb, Class<?> pClientPb) {
+ this.protoPb = pPb;
+ this.protoClientPb = pClientPb;
+ }
+ }
protected ConnectionPool(Configuration config, String address,
UserGroupInformation user, int minPoolSize, int maxPoolSize,
@@ -325,6 +359,7 @@ public class ConnectionPool {
* context for a single user/security context. To maximize throughput it is
* recommended to use multiple connection per user+server, allowing multiple
* writes and reads to be dispatched in parallel.
+ * @param <T>
*
* @param conf Configuration for the connection.
* @param nnAddress Address of server supporting the ClientProtocol.
@@ -334,47 +369,19 @@ public class ConnectionPool {
* security context.
* @throws IOException If it cannot be created.
*/
- protected static ConnectionContext newConnection(Configuration conf,
- String nnAddress, UserGroupInformation ugi, Class<?> proto)
- throws IOException {
- ConnectionContext ret;
- if (proto == ClientProtocol.class) {
- ret = newClientConnection(conf, nnAddress, ugi);
- } else if (proto == NamenodeProtocol.class) {
- ret = newNamenodeConnection(conf, nnAddress, ugi);
- } else {
- String msg = "Unsupported protocol for connection to NameNode: " +
- ((proto != null) ? proto.getClass().getName() : "null");
+ protected static <T> ConnectionContext newConnection(Configuration conf,
+ String nnAddress, UserGroupInformation ugi, Class<T> proto)
+ throws IOException {
+ if (!PROTO_MAP.containsKey(proto)) {
+ String msg = "Unsupported protocol for connection to NameNode: "
+ + ((proto != null) ? proto.getClass().getName() : "null");
LOG.error(msg);
throw new IllegalStateException(msg);
}
- return ret;
- }
+ ProtoImpl classes = PROTO_MAP.get(proto);
+ RPC.setProtocolEngine(conf, classes.protoPb, ProtobufRpcEngine.class);
- /**
- * Creates a proxy wrapper for a client NN connection. Each proxy contains
- * context for a single user/security context. To maximize throughput it is
- * recommended to use multiple connection per user+server, allowing multiple
- * writes and reads to be dispatched in parallel.
- *
- * Mostly based on NameNodeProxies#createNonHAProxy() but it needs the
- * connection identifier.
- *
- * @param conf Configuration for the connection.
- * @param nnAddress Address of server supporting the ClientProtocol.
- * @param ugi User context.
- * @return Proxy for the target ClientProtocol that contains the user's
- * security context.
- * @throws IOException If it cannot be created.
- */
- private static ConnectionContext newClientConnection(
- Configuration conf, String nnAddress, UserGroupInformation ugi)
- throws IOException {
- RPC.setProtocolEngine(
- conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
-
- final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
- conf,
+ final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
@@ -386,61 +393,32 @@ public class ConnectionPool {
SaslRpcServer.init(conf);
}
InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
- final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
- ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
- ClientNamenodeProtocolPB.class, version, socket, ugi, conf,
- factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
- ClientProtocol client = new ClientNamenodeProtocolTranslatorPB(proxy);
+ final long version = RPC.getProtocolVersion(classes.protoPb);
+ Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
+ conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
+ T client = newProtoClient(proto, classes, proxy);
Text dtService = SecurityUtil.buildTokenService(socket);
- ProxyAndInfo<ClientProtocol> clientProxy =
- new ProxyAndInfo<ClientProtocol>(client, dtService, socket);
+ ProxyAndInfo<T> clientProxy =
+ new ProxyAndInfo<T>(client, dtService, socket);
ConnectionContext connection = new ConnectionContext(clientProxy);
return connection;
}
- /**
- * Creates a proxy wrapper for a NN connection. Each proxy contains context
- * for a single user/security context. To maximize throughput it is
- * recommended to use multiple connection per user+server, allowing multiple
- * writes and reads to be dispatched in parallel.
- *
- * @param conf Configuration for the connection.
- * @param nnAddress Address of server supporting the ClientProtocol.
- * @param ugi User context.
- * @return Proxy for the target NamenodeProtocol that contains the user's
- * security context.
- * @throws IOException If it cannot be created.
- */
- private static ConnectionContext newNamenodeConnection(
- Configuration conf, String nnAddress, UserGroupInformation ugi)
- throws IOException {
- RPC.setProtocolEngine(
- conf, NamenodeProtocolPB.class, ProtobufRpcEngine.class);
-
- final RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(
- conf,
- HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
- HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
- HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
- HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
- HdfsConstants.SAFEMODE_EXCEPTION_CLASS_NAME);
-
- SocketFactory factory = SocketFactory.getDefault();
- if (UserGroupInformation.isSecurityEnabled()) {
- SaslRpcServer.init(conf);
+ private static <T> T newProtoClient(Class<T> proto, ProtoImpl classes,
+ Object proxy) {
+ try {
+ Constructor<?> constructor =
+ classes.protoClientPb.getConstructor(classes.protoPb);
+ Object o = constructor.newInstance(new Object[] {proxy});
+ if (proto.isAssignableFrom(o.getClass())) {
+ @SuppressWarnings("unchecked")
+ T client = (T) o;
+ return client;
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
}
- InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
- final long version = RPC.getProtocolVersion(NamenodeProtocolPB.class);
- NamenodeProtocolPB proxy = RPC.getProtocolProxy(NamenodeProtocolPB.class,
- version, socket, ugi, conf,
- factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
- NamenodeProtocol client = new NamenodeProtocolTranslatorPB(proxy);
- Text dtService = SecurityUtil.buildTokenService(socket);
-
- ProxyAndInfo<NamenodeProtocol> clientProxy =
- new ProxyAndInfo<NamenodeProtocol>(client, dtService, socket);
- ConnectionContext connection = new ConnectionContext(clientProxy);
- return connection;
+ return null;
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 6facd7e..f33d1d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -139,15 +139,17 @@ import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos;
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
+import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,8 +165,8 @@ import com.google.protobuf.BlockingService;
* the requests to the active
* {@link org.apache.hadoop.hdfs.server.namenode.NameNode NameNode}.
*/
-public class RouterRpcServer extends AbstractService
- implements ClientProtocol, NamenodeProtocol, RefreshUserMappingsProtocol {
+public class RouterRpcServer extends AbstractService implements ClientProtocol,
+ NamenodeProtocol, RefreshUserMappingsProtocol, GetUserMappingsProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(RouterRpcServer.class);
@@ -207,6 +209,8 @@ public class RouterRpcServer extends AbstractService
private final RouterNamenodeProtocol nnProto;
/** ClientProtocol calls. */
private final RouterClientProtocol clientProto;
+ /** Other protocol calls. */
+ private final RouterUserProtocol routerProto;
/** Router security manager to handle token operations. */
private RouterSecurityManager securityManager = null;
/** Super user credentials that a thread may use. */
@@ -269,6 +273,12 @@ public class RouterRpcServer extends AbstractService
RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService.
newReflectiveBlockingService(refreshUserMappingXlator);
+ GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
+ new GetUserMappingsProtocolServerSideTranslatorPB(this);
+ BlockingService getUserMappingService =
+ GetUserMappingsProtocolProtos.GetUserMappingsProtocolService.
+ newReflectiveBlockingService(getUserMappingXlator);
+
InetSocketAddress confRpcAddress = conf.getSocketAddr(
RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY,
RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY,
@@ -297,6 +307,8 @@ public class RouterRpcServer extends AbstractService
conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer);
DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, this.rpcServer);
+ DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
+ getUserMappingService, this.rpcServer);
// Set service-level authorization security policy
this.serviceAuthEnabled = conf.getBoolean(
@@ -346,6 +358,7 @@ public class RouterRpcServer extends AbstractService
this.quotaCall = new Quota(this.router, this);
this.nnProto = new RouterNamenodeProtocol(this);
this.clientProto = new RouterClientProtocol(conf, this);
+ this.routerProto = new RouterUserProtocol(this);
}
@Override
@@ -1706,13 +1719,16 @@ public class RouterRpcServer extends AbstractService
@Override
public void refreshUserToGroupsMappings() throws IOException {
- LOG.info("Refresh user groups mapping in Router.");
- Groups.getUserToGroupsMappingService().refresh();
+ routerProto.refreshUserToGroupsMappings();
}
@Override
public void refreshSuperUserGroupsConfiguration() throws IOException {
- LOG.info("Refresh superuser groups configuration in Router.");
- ProxyUsers.refreshSuperUserGroupsConfiguration();
+ routerProto.refreshSuperUserGroupsConfiguration();
+ }
+
+ @Override
+ public String[] getGroupsForUser(String user) throws IOException {
+ return routerProto.getGroupsForUser(user);
}
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterUserProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterUserProtocol.java
new file mode 100644
index 0000000..742991e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterUserProtocol.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer.merge;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
+import org.apache.hadoop.security.Groups;
+import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Module that implements all the RPC calls in
+ * {@link RefreshUserMappingsProtocol} {@link GetUserMappingsProtocol} in the
+ * {@link RouterRpcServer}.
+ */
+public class RouterUserProtocol
+ implements RefreshUserMappingsProtocol, GetUserMappingsProtocol {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterUserProtocol.class);
+
+ /** RPC server to receive client calls. */
+ private final RouterRpcServer rpcServer;
+ /** RPC clients to connect to the Namenodes. */
+ private final RouterRpcClient rpcClient;
+
+ private final ActiveNamenodeResolver namenodeResolver;
+
+ public RouterUserProtocol(RouterRpcServer server) {
+ this.rpcServer = server;
+ this.rpcClient = this.rpcServer.getRPCClient();
+ this.namenodeResolver = this.rpcServer.getNamenodeResolver();
+ }
+
+ @Override
+ public void refreshUserToGroupsMappings() throws IOException {
+ LOG.debug("Refresh user groups mapping in Router.");
+ rpcServer.checkOperation(OperationCategory.UNCHECKED);
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ if (nss.isEmpty()) {
+ Groups.getUserToGroupsMappingService().refresh();
+ } else {
+ RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
+ "refreshUserToGroupsMappings");
+ rpcClient.invokeConcurrent(nss, method);
+ }
+ }
+
+ @Override
+ public void refreshSuperUserGroupsConfiguration() throws IOException {
+ LOG.debug("Refresh superuser groups configuration in Router.");
+ rpcServer.checkOperation(OperationCategory.UNCHECKED);
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ if (nss.isEmpty()) {
+ ProxyUsers.refreshSuperUserGroupsConfiguration();
+ } else {
+ RemoteMethod method = new RemoteMethod(RefreshUserMappingsProtocol.class,
+ "refreshSuperUserGroupsConfiguration");
+ rpcClient.invokeConcurrent(nss, method);
+ }
+ }
+
+ @Override
+ public String[] getGroupsForUser(String user) throws IOException {
+ LOG.debug("Getting groups for user {}", user);
+ rpcServer.checkOperation(OperationCategory.UNCHECKED);
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ if (nss.isEmpty()) {
+ return UserGroupInformation.createRemoteUser(user).getGroupNames();
+ } else {
+ RemoteMethod method = new RemoteMethod(GetUserMappingsProtocol.class,
+ "getGroupsForUser", new Class<?>[] {String.class}, user);
+ Map<FederationNamespaceInfo, String[]> results =
+ rpcClient.invokeConcurrent(nss, method, String[].class);
+ return merge(results, String.class);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index e656e7a..a07daef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -1631,6 +1631,16 @@ public class TestRouterRpc {
assertFalse(routerDFS.listCacheDirectives(filter).hasNext());
}
+ @Test
+ public void testgetGroupsForUser() throws IOException {
+ String[] group = new String[] {"bar", "group2"};
+ UserGroupInformation.createUserForTesting("user",
+ new String[] {"bar", "group2"});
+ String[] result =
+ router.getRouter().getRpcServer().getGroupsForUser("user");
+ assertArrayEquals(group, result);
+ }
+
/**
* Check the erasure coding policies in the Router and the Namenode.
* @return The erasure coding policies.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRefreshUserMappingsWithRouters.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterUserMappings.java
similarity index 86%
rename from hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRefreshUserMappingsWithRouters.java
rename to hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterUserMappings.java
index 597b8c2..dc7ebbf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRefreshUserMappingsWithRouters.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterUserMappings.java
@@ -20,14 +20,16 @@ package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
@@ -36,16 +38,19 @@ import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.URL;
@@ -54,18 +59,20 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * Tests RefreshUserMappingsProtocol With Routers.
+ * Test RefreshUserMappingsProtocol and GetUserMappingsProtocol with Routers.
*/
-public class TestRefreshUserMappingsWithRouters {
+public class TestRouterUserMappings {
private static final Logger LOG = LoggerFactory.getLogger(
- TestRefreshUserMappingsWithRouters.class);
+ TestRouterUserMappings.class);
private MiniRouterDFSCluster cluster;
private Router router;
@@ -74,7 +81,6 @@ public class TestRefreshUserMappingsWithRouters {
private static final String ROUTER_NS = "rbfns";
private static final String HDFS_SCHEMA = "hdfs://";
private static final String LOOPBACK_ADDRESS = "127.0.0.1";
- private static final String HDFS_PREFIX = HDFS_SCHEMA + LOOPBACK_ADDRESS;
private String tempResource = null;
@@ -111,7 +117,7 @@ public class TestRefreshUserMappingsWithRouters {
public void setUp() {
conf = new Configuration(false);
conf.setClass("hadoop.security.group.mapping",
- TestRefreshUserMappingsWithRouters.MockUnixGroupsMapping.class,
+ TestRouterUserMappings.MockUnixGroupsMapping.class,
GroupMappingServiceProvider.class);
conf.setLong("hadoop.security.groups.cache.secs",
GROUP_REFRESH_TIMEOUT_SEC);
@@ -123,23 +129,6 @@ public class TestRefreshUserMappingsWithRouters {
}
/**
- * Setup a single router, and return this router's rpc address
- * as fs.defaultFS for {@link DFSAdmin}.
- * @return router's rpc address
- * @throws Exception
- */
- private String setUpSingleRouterAndReturnDefaultFs() {
- router = new Router();
- conf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY,
- LOOPBACK_ADDRESS + ":" + NetUtils.getFreeSocketPort());
- router.init(conf);
- router.start();
- String defaultFs = HDFS_PREFIX + ":" +
- router.getRpcServerAddress().getPort();
- return defaultFs;
- }
-
- /**
* Setup a multi-routers mini dfs cluster with two nameservices
* and four routers.
* For dfsadmin clients to use the federated namespace, we need to create a
@@ -178,28 +167,14 @@ public class TestRefreshUserMappingsWithRouters {
}
@Test
- public void testRefreshSuperUserGroupsConfigurationWithSingleRouter()
- throws Exception {
- testRefreshSuperUserGroupsConfigurationInternal(
- setUpSingleRouterAndReturnDefaultFs());
- }
-
- @Test
- public void testRefreshSuperUserGroupsConfigurationWithMultiRouters()
+ public void testRefreshSuperUserGroupsConfiguration()
throws Exception {
testRefreshSuperUserGroupsConfigurationInternal(
setUpMultiRoutersAndReturnDefaultFs());
}
@Test
- public void testGroupMappingRefreshWithSingleRouter() throws Exception {
- testGroupMappingRefreshInternal(
- setUpSingleRouterAndReturnDefaultFs());
- }
-
-
- @Test
- public void testGroupMappingRefreshWithMultiRouters() throws Exception {
+ public void testGroupMappingRefresh() throws Exception {
testGroupMappingRefreshInternal(
setUpMultiRoutersAndReturnDefaultFs());
}
@@ -282,6 +257,43 @@ public class TestRefreshUserMappingsWithRouters {
fail("second auth for " + ugi1.getShortUserName() +
" should've succeeded: " + e.getLocalizedMessage());
}
+
+ // get groups
+ testGroupsForUserCLI(conf, "user");
+ testGroupsForUserProtocol(conf, "user");
+ }
+
+ /**
+ * Use the command line to get the groups.
+ * @param config Configuration containing the default filesystem.
+ * @param username Username to check.
+ * @throws Exception If it cannot get the groups.
+ */
+ private void testGroupsForUserCLI(Configuration config, String username)
+ throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ PrintStream oldOut = System.out;
+ System.setOut(new PrintStream(out));
+ new GetGroups(config).run(new String[]{username});
+ assertTrue("Wrong output: " + out,
+ out.toString().startsWith(username + " : " + username));
+ out.reset();
+ System.setOut(oldOut);
+ }
+
+ /**
+ * Use the GetUserMappingsProtocol protocol to get the groups.
+ * @param config Configuration containing the default filesystem.
+ * @param username Username to check.
+ * @throws IOException If it cannot get the groups.
+ */
+ private void testGroupsForUserProtocol(Configuration config, String username)
+ throws IOException {
+ GetUserMappingsProtocol proto = NameNodeProxies.createProxy(
+ config, FileSystem.getDefaultUri(config),
+ GetUserMappingsProtocol.class).getProxy();
+ String[] groups = proto.getGroupsForUser(username);
+ assertArrayEquals(new String[] {"user1", "user2"}, groups);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org