You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2015/12/01 23:37:30 UTC
[13/46] hadoop git commit: HDFS-8855. Webhdfs client leaks active
NameNode connections. Contributed by Xiaobing Zhou.
HDFS-8855. Webhdfs client leaks active NameNode connections. Contributed by Xiaobing Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fe5624b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fe5624b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fe5624b8
Branch: refs/heads/HDFS-1312
Commit: fe5624b85d71720ae9da90a01cad9a3d1ea41160
Parents: e8a87d7
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Nov 24 12:41:08 2015 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue Nov 24 12:47:57 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/security/token/Token.java | 10 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +
.../server/datanode/web/DatanodeHttpServer.java | 4 +-
.../web/webhdfs/DataNodeUGIProvider.java | 106 +++++++--
.../src/main/resources/hdfs-default.xml | 8 +
.../web/webhdfs/TestDataNodeUGIProvider.java | 235 +++++++++++++++++++
7 files changed, 350 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5624b8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
index 2420155..f8b7355 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.security.token;
import com.google.common.collect.Maps;
+import com.google.common.primitives.Bytes;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,6 +34,7 @@ import java.io.*;
import java.util.Arrays;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.UUID;
/**
* The client-side form of the token.
@@ -337,7 +340,12 @@ public class Token<T extends TokenIdentifier> implements Writable {
identifierToString(buffer);
return buffer.toString();
}
-
+
+ public String buildCacheKey() {
+ return UUID.nameUUIDFromBytes(
+ Bytes.concat(kind.getBytes(), identifier, password)).toString();
+ }
+
private static ServiceLoader<TokenRenewer> renewers =
ServiceLoader.load(TokenRenewer.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5624b8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 95dfbcf..7d9df2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2373,6 +2373,9 @@ Release 2.8.0 - UNRELEASED
HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally.
(Wei-Chiu Chuang via cnauroth)
+ HDFS-8855. Webhdfs client leaks active NameNode connections.
+ (Xiaobing Zhou via xyao)
+
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5624b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 54e0d10..6986896 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -70,6 +70,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_WEBHDFS_NETTY_HIGH_WATERMARK =
"dfs.webhdfs.netty.high.watermark";
public static final int DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT = 65535;
+ public static final String DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY =
+ "dfs.webhdfs.ugi.expire.after.access";
+ public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT =
+ 10*60*1000; //10 minutes
// HA related configuration
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5624b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
index 441d520..fc24fae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.DataNodeUGIProvider;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.net.NetUtils;
@@ -74,7 +75,6 @@ public class DatanodeHttpServer implements Closeable {
private final Configuration confForCreate;
private InetSocketAddress httpAddress;
private InetSocketAddress httpsAddress;
-
static final Log LOG = LogFactory.getLog(DatanodeHttpServer.class);
public DatanodeHttpServer(final Configuration conf,
@@ -99,7 +99,7 @@ public class DatanodeHttpServer implements Closeable {
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
this.infoServer.addServlet(null, "/blockScannerReport",
BlockScanner.Servlet.class);
-
+ DataNodeUGIProvider.init(conf);
this.infoServer.start();
final InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5624b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java
index ea1c29f..233ba69 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/DataNodeUGIProvider.java
@@ -13,50 +13,103 @@
*/
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
/**
* Create UGI from the request for the WebHDFS requests for the DNs. Note that
* the DN does not authenticate the UGI -- the NN will authenticate them in
* subsequent operations.
*/
-class DataNodeUGIProvider {
+public class DataNodeUGIProvider {
private final ParameterParser params;
+ @VisibleForTesting
+ static Cache<String, UserGroupInformation> ugiCache;
+ public static final Log LOG = LogFactory.getLog(Client.class);
DataNodeUGIProvider(ParameterParser params) {
this.params = params;
}
- UserGroupInformation ugi() throws IOException {
- if (UserGroupInformation.isSecurityEnabled()) {
- return tokenUGI();
+ public static synchronized void init(Configuration conf) {
+ if (ugiCache == null) {
+ ugiCache = CacheBuilder
+ .newBuilder()
+ .expireAfterAccess(
+ conf.getInt(
+ DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY,
+ DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT),
+ TimeUnit.MILLISECONDS).build();
}
+ }
- final String usernameFromQuery = params.userName();
- final String doAsUserFromQuery = params.doAsUser();
- final String remoteUser = usernameFromQuery == null
- ? JspHelper.getDefaultWebUserName(params.conf()) // not specified in
- // request
- : usernameFromQuery;
+ UserGroupInformation ugi() throws IOException {
+ UserGroupInformation ugi;
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
- JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery);
- if (doAsUserFromQuery != null) {
- // create and attempt to authorize a proxy user
- ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
+ try {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ final Token<DelegationTokenIdentifier> token = params.delegationToken();
+
+ ugi = ugiCache.get(buildTokenCacheKey(token),
+ new Callable<UserGroupInformation>() {
+ @Override
+ public UserGroupInformation call() throws Exception {
+ return tokenUGI(token);
+ }
+ });
+ } else {
+ final String usernameFromQuery = params.userName();
+ final String doAsUserFromQuery = params.doAsUser();
+ final String remoteUser = usernameFromQuery == null ? JspHelper
+ .getDefaultWebUserName(params.conf()) // not specified in request
+ : usernameFromQuery;
+
+ ugi = ugiCache.get(
+ buildNonTokenCacheKey(doAsUserFromQuery, remoteUser),
+ new Callable<UserGroupInformation>() {
+ @Override
+ public UserGroupInformation call() throws Exception {
+ return nonTokenUGI(usernameFromQuery, doAsUserFromQuery,
+ remoteUser);
+ }
+ });
+ }
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw new IOException(cause);
+ }
}
+
return ugi;
}
- private UserGroupInformation tokenUGI() throws IOException {
- Token<DelegationTokenIdentifier> token = params.delegationToken();
+ private String buildTokenCacheKey(Token<DelegationTokenIdentifier> token) {
+ return token.buildCacheKey();
+ }
+
+ private UserGroupInformation tokenUGI(Token<DelegationTokenIdentifier> token)
+ throws IOException {
ByteArrayInputStream buf =
new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
@@ -67,4 +120,23 @@ class DataNodeUGIProvider {
return ugi;
}
+ private String buildNonTokenCacheKey(String doAsUserFromQuery,
+ String remoteUser) throws IOException {
+ String key = doAsUserFromQuery == null ? String.format("{%s}", remoteUser)
+ : String.format("{%s}:{%s}", remoteUser, doAsUserFromQuery);
+ return key;
+ }
+
+ private UserGroupInformation nonTokenUGI(String usernameFromQuery,
+ String doAsUserFromQuery, String remoteUser) throws IOException {
+
+ UserGroupInformation ugi = UserGroupInformation
+ .createRemoteUser(remoteUser);
+ JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery);
+ if (doAsUserFromQuery != null) {
+ // create and attempt to authorize a proxy user
+ ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
+ }
+ return ugi;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5624b8/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 70dc56a..24371df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2482,6 +2482,14 @@
</property>
<property>
+ <name>dfs.webhdfs.ugi.expire.after.access</name>
+ <value>600000</value>
+ <description>How long in milliseconds after the last access
+ the cached UGI will expire. With 0, never expire.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.blocks.per.postponedblocks.rescan</name>
<value>10000</value>
<description>Number of blocks to rescan for each iteration of
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fe5624b8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java
new file mode 100644
index 0000000..bce5422
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/TestDataNodeUGIProvider.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
+
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.mockito.Mockito.mock;
+import io.netty.handler.codec.http.QueryStringDecoder;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
+import org.apache.hadoop.hdfs.web.resources.DelegationParam;
+import org.apache.hadoop.hdfs.web.resources.LengthParam;
+import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
+import org.apache.hadoop.hdfs.web.resources.OffsetParam;
+import org.apache.hadoop.hdfs.web.resources.Param;
+import org.apache.hadoop.hdfs.web.resources.UserParam;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+
+public class TestDataNodeUGIProvider {
+ private final URI uri = URI.create(WebHdfsConstants.WEBHDFS_SCHEME + "://"
+ + "127.0.0.1:0");
+ private final String PATH = "/foo";
+ private final int OFFSET = 42;
+ private final int LENGTH = 512;
+ private final static int EXPIRE_AFTER_ACCESS = 5*1000;
+ private Configuration conf;
+ @Before
+ public void setUp(){
+ conf = WebHdfsTestUtil.createConf();
+ conf.setInt(DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY,
+ EXPIRE_AFTER_ACCESS);
+ DataNodeUGIProvider.init(conf);
+ }
+
+ @Test
+ public void testUGICacheSecure() throws Exception {
+ // fake turning on security so api thinks it should use tokens
+ SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
+ UserGroupInformation.setConfiguration(conf);
+
+ UserGroupInformation ugi = UserGroupInformation
+ .createRemoteUser("test-user");
+ ugi.setAuthenticationMethod(KERBEROS);
+ ugi = UserGroupInformation.createProxyUser("test-proxy-user", ugi);
+ UserGroupInformation.setLoginUser(ugi);
+
+ List<Token<DelegationTokenIdentifier>> tokens = Lists.newArrayList();
+ getWebHdfsFileSystem(ugi, conf, tokens);
+
+ String uri1 = WebHdfsFileSystem.PATH_PREFIX
+ + PATH
+ + "?op=OPEN"
+ + Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"),
+ new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH),
+ new DelegationParam(tokens.get(0).encodeToUrlString()));
+
+ String uri2 = WebHdfsFileSystem.PATH_PREFIX
+ + PATH
+ + "?op=OPEN"
+ + Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"),
+ new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH),
+ new DelegationParam(tokens.get(1).encodeToUrlString()));
+
+ DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider(
+ new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf));
+ UserGroupInformation ugi11 = ugiProvider1.ugi();
+ UserGroupInformation ugi12 = ugiProvider1.ugi();
+
+ Assert.assertEquals(
+ "With UGI cache, two UGIs returned by the same token should be same",
+ ugi11, ugi12);
+
+ DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider(
+ new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf));
+ UserGroupInformation url21 = ugiProvider2.ugi();
+ UserGroupInformation url22 = ugiProvider2.ugi();
+
+ Assert.assertEquals(
+ "With UGI cache, two UGIs returned by the same token should be same",
+ url21, url22);
+
+ Assert.assertNotEquals(
+ "With UGI cache, two UGIs for the different token should not be same",
+ ugi11, url22);
+
+ awaitCacheEmptyDueToExpiration();
+ ugi12 = ugiProvider1.ugi();
+ url22 = ugiProvider2.ugi();
+
+ String msg = "With cache eviction, two UGIs returned" +
+ " by the same token should not be same";
+ Assert.assertNotEquals(msg, ugi11, ugi12);
+ Assert.assertNotEquals(msg, url21, url22);
+
+ Assert.assertNotEquals(
+ "With UGI cache, two UGIs for the different token should not be same",
+ ugi11, url22);
+ }
+
+ @Test
+ public void testUGICacheInSecure() throws Exception {
+ String uri1 = WebHdfsFileSystem.PATH_PREFIX
+ + PATH
+ + "?op=OPEN"
+ + Param.toSortedString("&", new OffsetParam((long) OFFSET),
+ new LengthParam((long) LENGTH), new UserParam("root"));
+
+ String uri2 = WebHdfsFileSystem.PATH_PREFIX
+ + PATH
+ + "?op=OPEN"
+ + Param.toSortedString("&", new OffsetParam((long) OFFSET),
+ new LengthParam((long) LENGTH), new UserParam("hdfs"));
+
+ DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider(
+ new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf));
+ UserGroupInformation ugi11 = ugiProvider1.ugi();
+ UserGroupInformation ugi12 = ugiProvider1.ugi();
+
+ Assert.assertEquals(
+ "With UGI cache, two UGIs for the same user should be same", ugi11,
+ ugi12);
+
+ DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider(
+ new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf));
+ UserGroupInformation url21 = ugiProvider2.ugi();
+ UserGroupInformation url22 = ugiProvider2.ugi();
+
+ Assert.assertEquals(
+ "With UGI cache, two UGIs for the same user should be same", url21,
+ url22);
+
+ Assert.assertNotEquals(
+ "With UGI cache, two UGIs for the different user should not be same",
+ ugi11, url22);
+
+ awaitCacheEmptyDueToExpiration();
+ ugi12 = ugiProvider1.ugi();
+ url22 = ugiProvider2.ugi();
+
+ String msg = "With cache eviction, two UGIs returned by" +
+ " the same user should not be same";
+ Assert.assertNotEquals(msg, ugi11, ugi12);
+ Assert.assertNotEquals(msg, url21, url22);
+
+ Assert.assertNotEquals(
+ "With UGI cache, two UGIs for the different user should not be same",
+ ugi11, url22);
+ }
+
+ /**
+ * Wait for expiration of entries from the UGI cache. We need to be careful
+ * not to touch the entries in the cache while we're waiting for expiration.
+ * If we did, then that would reset the clock on expiration for those entries.
+ * Instead, we trigger internal clean-up of the cache and check for size 0.
+ *
+ * @throws Exception if there is any error
+ */
+ private void awaitCacheEmptyDueToExpiration() throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ DataNodeUGIProvider.ugiCache.cleanUp();
+ return DataNodeUGIProvider.ugiCache.size() == 0;
+ }
+ }, EXPIRE_AFTER_ACCESS, 10 * EXPIRE_AFTER_ACCESS);
+ }
+
+ private WebHdfsFileSystem getWebHdfsFileSystem(UserGroupInformation ugi,
+ Configuration conf, List<Token<DelegationTokenIdentifier>> tokens)
+ throws IOException {
+ if (UserGroupInformation.isSecurityEnabled()) {
+ DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text(
+ ugi.getUserName()), null, null);
+ FSNamesystem namesystem = mock(FSNamesystem.class);
+ DelegationTokenSecretManager dtSecretManager = new DelegationTokenSecretManager(
+ 86400000, 86400000, 86400000, 86400000, namesystem);
+ dtSecretManager.startThreads();
+ Token<DelegationTokenIdentifier> token1 = new Token<DelegationTokenIdentifier>(
+ dtId, dtSecretManager);
+ Token<DelegationTokenIdentifier> token2 = new Token<DelegationTokenIdentifier>(
+ dtId, dtSecretManager);
+ SecurityUtil.setTokenService(token1,
+ NetUtils.createSocketAddr(uri.getAuthority()));
+ SecurityUtil.setTokenService(token2,
+ NetUtils.createSocketAddr(uri.getAuthority()));
+ token1.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND);
+ token2.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND);
+
+ tokens.add(token1);
+ tokens.add(token2);
+
+ ugi.addToken(token1);
+ ugi.addToken(token2);
+ }
+ return (WebHdfsFileSystem) FileSystem.get(uri, conf);
+ }
+}