You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by fe...@apache.org on 2022/07/23 14:19:45 UTC
[hadoop] branch trunk updated: HDFS-15079. RBF: Namenode needs to use the actual client Id and callId when going through RBF proxy. (#4530)
This is an automated email from the ASF dual-hosted git repository.
ferhui pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2c963570517 HDFS-15079. RBF: Namenode needs to use the actual client Id and callId when going through RBF proxy. (#4530)
2c963570517 is described below
commit 2c96357051718a519af7efbe748269977351fa89
Author: xuzq <15...@163.com>
AuthorDate: Sat Jul 23 22:19:37 2022 +0800
HDFS-15079. RBF: Namenode needs to use the actual client Id and callId when going through RBF proxy. (#4530)
---
.../hadoop/io/retry/RetryInvocationHandler.java | 6 +-
.../java/org/apache/hadoop/ipc/CallerContext.java | 2 +
.../java/org/apache/hadoop/ipc/RetryCache.java | 50 ++++---
.../java/org/apache/hadoop/ipc/TestRetryCache.java | 23 ++--
.../server/federation/router/RouterRpcClient.java | 14 +-
.../federation/router/TestRouterRetryCache.java | 144 +++++++++++++++++++++
.../server/federation/router/TestRouterRpc.java | 37 ++++++
.../hadoop/hdfs/server/namenode/FSEditLog.java | 13 +-
.../hadoop/hdfs/server/namenode/NameNode.java | 91 +++++++++++++
.../hdfs/server/namenode/NameNodeRpcServer.java | 123 ++++++++----------
10 files changed, 395 insertions(+), 108 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 3960b189665..9707ee388e1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -46,6 +46,10 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
public static final Logger LOG = LoggerFactory.getLogger(
RetryInvocationHandler.class);
+ @VisibleForTesting
+ public static final ThreadLocal<Boolean> SET_CALL_ID_FOR_TEST =
+ ThreadLocal.withInitial(() -> true);
+
static class Call {
private final Method method;
private final Object[] args;
@@ -159,7 +163,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
}
Object invokeMethod() throws Throwable {
- if (isRpc) {
+ if (isRpc && SET_CALL_ID_FOR_TEST.get()) {
Client.setCallIdAndRetryCount(callId, counters.retries,
retryInvocationHandler.asyncCallHandler);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
index dbd9184a2b9..98d7e82c70e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
@@ -47,6 +47,8 @@ public final class CallerContext {
// field names
public static final String CLIENT_IP_STR = "clientIp";
public static final String CLIENT_PORT_STR = "clientPort";
+ public static final String CLIENT_ID_STR = "clientId";
+ public static final String CLIENT_CALL_ID_STR = "clientCallId";
/** The caller context.
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
index 3d64a84bfb4..624cc08ac25 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
@@ -55,14 +55,14 @@ public class RetryCache {
/**
* Processing state of the requests.
*/
- private static byte INPROGRESS = 0;
- private static byte SUCCESS = 1;
- private static byte FAILED = 2;
+ private static final byte INPROGRESS = 0;
+ private static final byte SUCCESS = 1;
+ private static final byte FAILED = 2;
private byte state = INPROGRESS;
// Store uuid as two long for better memory utilization
- private final long clientIdMsb; // Most signficant bytes
+ private final long clientIdMsb; // Most significant bytes
private final long clientIdLsb; // Least significant bytes
private final int callId;
@@ -140,8 +140,8 @@ public class RetryCache {
@Override
public String toString() {
- return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
- + this.callId + ":" + this.state;
+ return String.format("%s:%s:%s", new UUID(this.clientIdMsb, this.clientIdLsb),
+ this.callId, this.state);
}
}
@@ -183,7 +183,7 @@ public class RetryCache {
private final LightWeightGSet<CacheEntry, CacheEntry> set;
private final long expirationTime;
- private String cacheName;
+ private final String cacheName;
private final ReentrantLock lock = new ReentrantLock();
@@ -195,7 +195,7 @@ public class RetryCache {
*/
public RetryCache(String cacheName, double percentage, long expirationTime) {
int capacity = LightWeightGSet.computeCapacity(percentage, cacheName);
- capacity = capacity > MAX_CAPACITY ? capacity : MAX_CAPACITY;
+ capacity = Math.max(capacity, MAX_CAPACITY);
this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
expirationTime, 0);
this.expirationTime = expirationTime;
@@ -203,11 +203,11 @@ public class RetryCache {
this.retryCacheMetrics = RetryCacheMetrics.create(this);
}
- private static boolean skipRetryCache() {
+ private static boolean skipRetryCache(byte[] clientId, int callId) {
// Do not track non RPC invocation or RPC requests with
// invalid callId or clientId in retry cache
- return !Server.isRpcInvocation() || Server.getCallId() < 0
- || Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
+ return !Server.isRpcInvocation() || callId < 0
+ || Arrays.equals(clientId, RpcConstants.DUMMY_CLIENT_ID);
}
public void lock() {
@@ -332,43 +332,51 @@ public class RetryCache {
retryCacheMetrics.incrCacheUpdated();
}
- private static CacheEntry newEntry(long expirationTime) {
- return new CacheEntry(Server.getClientId(), Server.getCallId(),
+ private static CacheEntry newEntry(long expirationTime,
+ byte[] clientId, int callId) {
+ return new CacheEntry(clientId, callId,
System.nanoTime() + expirationTime);
}
private static CacheEntryWithPayload newEntry(Object payload,
- long expirationTime) {
- return new CacheEntryWithPayload(Server.getClientId(), Server.getCallId(),
+ long expirationTime, byte[] clientId, int callId) {
+ return new CacheEntryWithPayload(clientId, callId,
payload, System.nanoTime() + expirationTime);
}
/**
* Static method that provides null check for retryCache.
* @param cache input Cache.
+ * @param clientId client id of this request
+ * @param callId client call id of this request
* @return CacheEntry.
*/
- public static CacheEntry waitForCompletion(RetryCache cache) {
- if (skipRetryCache()) {
+ public static CacheEntry waitForCompletion(RetryCache cache,
+ byte[] clientId, int callId) {
+ if (skipRetryCache(clientId, callId)) {
return null;
}
return cache != null ? cache
- .waitForCompletion(newEntry(cache.expirationTime)) : null;
+ .waitForCompletion(newEntry(cache.expirationTime,
+ clientId, callId)) : null;
}
/**
* Static method that provides null check for retryCache.
* @param cache input cache.
* @param payload input payload.
+ * @param clientId client id of this request
+ * @param callId client call id of this request
* @return CacheEntryWithPayload.
*/
public static CacheEntryWithPayload waitForCompletion(RetryCache cache,
- Object payload) {
- if (skipRetryCache()) {
+ Object payload, byte[] clientId, int callId) {
+ if (skipRetryCache(clientId, callId)) {
return null;
}
return (CacheEntryWithPayload) (cache != null ? cache
- .waitForCompletion(newEntry(payload, cache.expirationTime)) : null);
+ .waitForCompletion(newEntry(payload, cache.expirationTime,
+ clientId, callId)) : null);
}
public static void setState(CacheEntry e, boolean success) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
index 64607deb908..b789ada5271 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
@@ -50,14 +50,14 @@ public class TestRetryCache {
static class TestServer {
AtomicInteger retryCount = new AtomicInteger();
AtomicInteger operationCount = new AtomicInteger();
- private RetryCache retryCache = new RetryCache("TestRetryCache", 1,
- 100 * 1000 * 1000 * 1000L);
+ private final RetryCache retryCache = new RetryCache(
+ "TestRetryCache", 1, 100 * 1000 * 1000 * 1000L);
/**
* A server method implemented using {@link RetryCache}.
*
* @param input is returned back in echo, if {@code success} is true.
- * @param failureOuput returned on failure, if {@code success} is false.
+ * @param failureOutput returned on failure, if {@code success} is false.
* @param methodTime time taken by the operation. By passing smaller/larger
* value one can simulate an operation that takes short/long time.
* @param success whether this operation completes successfully or not
@@ -67,7 +67,7 @@ public class TestRetryCache {
int echo(int input, int failureOutput, long methodTime, boolean success)
throws InterruptedException {
CacheEntryWithPayload entry = RetryCache.waitForCompletion(retryCache,
- null);
+ null, Server.getClientId(), Server.getCallId());
if (entry != null && entry.isSuccess()) {
System.out.println("retryCount incremented " + retryCount.get());
retryCount.incrementAndGet();
@@ -173,16 +173,13 @@ public class TestRetryCache {
final int failureOutput = input + 1;
ExecutorService executorService = Executors
.newFixedThreadPool(numberOfThreads);
- List<Future<Integer>> list = new ArrayList<Future<Integer>>();
+ List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {
- Callable<Integer> worker = new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- Server.getCurCall().set(call);
- Assert.assertEquals(Server.getCurCall().get(), call);
- int randomPause = pause == 0 ? pause : r.nextInt(pause);
- return testServer.echo(input, failureOutput, randomPause, success);
- }
+ Callable<Integer> worker = () -> {
+ Server.getCurCall().set(call);
+ Assert.assertEquals(Server.getCurCall().get(), call);
+ int randomPause = pause == 0 ? pause : r.nextInt(pause);
+ return testServer.echo(input, failureOutput, randomPause, success);
};
Future<Integer> submit = executorService.submit(worker);
list.add(submit);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index ff90854ebb7..e90cc5fda41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -464,7 +465,7 @@ public class RouterRpcClient {
+ router.getRouterId());
}
- addClientIpToCallerContext();
+ addClientInfoToCallerContext();
Object ret = null;
if (rpcMonitor != null) {
@@ -584,12 +585,13 @@ public class RouterRpcClient {
}
/**
- * For tracking which is the actual client address.
- * It adds trace info "clientIp:ip" and "clientPort:port"
+ * For tracking some information about the actual client.
+ * It adds trace info "clientIp:ip", "clientPort:port",
+ * "clientId:id" and "clientCallId:callId"
* in the caller context, removing the old values if they were
* already present.
*/
- private void addClientIpToCallerContext() {
+ private void addClientInfoToCallerContext() {
CallerContext ctx = CallerContext.getCurrent();
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
@@ -598,6 +600,10 @@ public class RouterRpcClient {
.append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
.append(CallerContext.CLIENT_PORT_STR,
Integer.toString(Server.getRemotePort()))
+ .append(CallerContext.CLIENT_ID_STR,
+ StringUtils.byteToHexString(Server.getClientId()))
+ .append(CallerContext.CLIENT_CALL_ID_STR,
+ Integer.toString(Server.getCallId()))
.setSignature(origSignature);
// Append the original caller context
if (origContext != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java
new file mode 100644
index 00000000000..46a23549797
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.retry.RetryInvocationHandler;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestRouterRetryCache {
+ /** Federated HDFS cluster. */
+ private MiniRouterDFSCluster cluster;
+
+ @Before
+ public void setup() throws Exception {
+ Configuration namenodeConf = new Configuration();
+ namenodeConf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
+ cluster = new MiniRouterDFSCluster(true, 1);
+ cluster.addNamenodeOverrides(namenodeConf);
+
+ // Start NNs and DNs and wait until ready
+ cluster.startCluster();
+
+ // Start routers with only an RPC service
+ cluster.startRouters();
+
+ // Register and verify all NNs with all routers
+ cluster.registerNamenodes();
+ cluster.waitNamenodeRegistration();
+
+ // Setup the mount table
+ cluster.installMockLocations();
+
+ // Making one Namenodes active per nameservice
+ if (cluster.isHighAvailability()) {
+ for (String ns : cluster.getNameservices()) {
+ cluster.switchToActive(ns, NAMENODES[0]);
+ cluster.switchToStandby(ns, NAMENODES[1]);
+ }
+ }
+ cluster.waitActiveNamespaces();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Test
+ public void testRetryCache() throws Exception {
+ RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false);
+ FileSystem routerFS = cluster.getRandomRouter().getFileSystem();
+ Path testDir = new Path("/target-ns0/testdir");
+ routerFS.mkdirs(testDir);
+ routerFS.setPermission(testDir, FsPermission.getDefault());
+
+ // Run as fake joe to authorize the test
+ UserGroupInformation joe =
+ UserGroupInformation.createUserForTesting("fake_joe",
+ new String[]{"fake_group"});
+ FileSystem joeFS = joe.doAs(
+ (PrivilegedExceptionAction<FileSystem>) () ->
+ FileSystem.newInstance(routerFS.getUri(), routerFS.getConf()));
+
+ Path renameSrc = new Path(testDir, "renameSrc");
+ Path renameDst = new Path(testDir, "renameDst");
+ joeFS.mkdirs(renameSrc);
+
+ assertEquals(HAServiceProtocol.HAServiceState.ACTIVE,
+ cluster.getCluster().getNamesystem(0).getState());
+
+ int callId = Client.nextCallId();
+ Client.setCallIdAndRetryCount(callId, 0, null);
+ assertTrue(joeFS.rename(renameSrc, renameDst));
+
+ Client.setCallIdAndRetryCount(callId, 0, null);
+ assertTrue(joeFS.rename(renameSrc, renameDst));
+
+ String ns0 = cluster.getNameservices().get(0);
+ cluster.switchToStandby(ns0, NAMENODES[0]);
+ cluster.switchToActive(ns0, NAMENODES[1]);
+
+ assertEquals(HAServiceProtocol.HAServiceState.ACTIVE,
+ cluster.getCluster().getNamesystem(1).getState());
+
+ Client.setCallIdAndRetryCount(callId, 0, null);
+ assertTrue(joeFS.rename(renameSrc, renameDst));
+ }
+
+ @Test
+ public void testParseSpecialValue() {
+ String mockContent = "mockContent,clientIp:127.0.0.1," +
+ "clientCallId:12345,clientId:mockClientId";
+ String clientIp = NameNode.parseSpecialValue(mockContent, "clientIp:");
+ assertEquals("127.0.0.1", clientIp);
+
+ String clientCallId = NameNode.parseSpecialValue(
+ mockContent, "clientCallId:");
+ assertEquals("12345", clientCallId);
+
+ String clientId = NameNode.parseSpecialValue(mockContent, "clientId:");
+ assertEquals("mockClientId", clientId);
+
+ String clientRetryNum = NameNode.parseSpecialValue(
+ mockContent, "clientRetryNum:");
+ assertNull(clientRetryNum);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index f71145a4522..31cc18fc882 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -2053,6 +2053,8 @@ public class TestRouterRpc {
final String logOutput = auditlog.getOutput();
assertTrue(logOutput.contains("callerContext=clientIp:"));
assertTrue(logOutput.contains(",clientContext"));
+ assertTrue(logOutput.contains(",clientId"));
+ assertTrue(logOutput.contains(",clientCallId"));
assertTrue(verifyFileExists(routerFS, dirPath));
}
@@ -2103,6 +2105,41 @@ public class TestRouterRpc {
assertFalse(auditLog.getOutput().contains("clientPort:1234"));
}
+ @Test
+ public void testAddClientIdAndCallIdToCallerContext() throws IOException {
+ GenericTestUtils.LogCapturer auditLog =
+ GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
+
+ // 1. ClientId and ClientCallId are not set on the client.
+ // Set client context.
+ CallerContext.setCurrent(
+ new CallerContext.Builder("clientContext").build());
+
+ // Create a directory via the router.
+ String dirPath = "/test";
+ routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
+
+ // The audit log should contains "clientId:" and "clientCallId:".
+ assertTrue(auditLog.getOutput().contains("clientId:"));
+ assertTrue(auditLog.getOutput().contains("clientCallId:"));
+ assertTrue(verifyFileExists(routerFS, dirPath));
+ auditLog.clearOutput();
+
+ // 2. ClientId and ClientCallId are set on the client.
+ // Reset client context.
+ CallerContext.setCurrent(
+ new CallerContext.Builder(
+ "clientContext,clientId:mockClientId,clientCallId:4321").build());
+
+ // Create a directory via the router.
+ routerProtocol.getFileInfo(dirPath);
+
+ // The audit log should not contain the original clientId and clientCallId
+ // set by client.
+ assertFalse(auditLog.getOutput().contains("clientId:mockClientId"));
+ assertFalse(auditLog.getOutput().contains("clientCallId:4321"));
+ }
+
@Test
public void testContentSummaryWithSnapshot() throws Exception {
DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 850b2fc5708..5bb6872e588 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.Time.monotonicNow;
@@ -30,6 +31,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -107,7 +109,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Lists;
@@ -195,6 +196,9 @@ public class FSEditLog implements LogsPurgeable {
protected final OpInstanceCache cache = new OpInstanceCache();
+ // Users who can override the client ip
+ private final String[] ipProxyUsers;
+
/**
* The edit directories that are shared between primary and secondary.
*/
@@ -246,6 +250,7 @@ public class FSEditLog implements LogsPurgeable {
* @param editsDirs List of journals to use
*/
FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
+ ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
isSyncRunning = false;
this.conf = conf;
this.storage = storage;
@@ -799,8 +804,10 @@ public class FSEditLog implements LogsPurgeable {
/** Record the RPC IDs if necessary */
private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
if (toLogRpcIds) {
- op.setRpcClientId(Server.getClientId());
- op.setRpcCallId(Server.getCallId());
+ Pair<byte[], Integer> clientIdAndCallId =
+ NameNode.getClientIdAndCallId(this.ipProxyUsers);
+ op.setRpcClientId(clientIdAndCallId.getLeft());
+ op.setRpcCallId(clientIdAndCallId.getRight());
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 0610048ccac..63c7721b749 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -17,7 +17,10 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Preconditions;
@@ -494,6 +497,94 @@ public class NameNode extends ReconfigurableBase implements
return metrics;
}
+ /**
+ * Try to obtain the actual client info according to the current user.
+ * @param ipProxyUsers Users who can override client infos
+ */
+ private static String clientInfoFromContext(
+ final String[] ipProxyUsers) {
+ if (ipProxyUsers != null) {
+ UserGroupInformation user =
+ UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
+ if (user != null &&
+ ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
+ CallerContext context = CallerContext.getCurrent();
+ if (context != null && context.isContextValid()) {
+ return context.getContext();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Try to obtain the value corresponding to the key by parsing the content.
+ * @param content the full content to be parsed.
+ * @param key trying to obtain the value of the key.
+ * @return the value corresponding to the key.
+ */
+ @VisibleForTesting
+ public static String parseSpecialValue(String content, String key) {
+ int posn = content.indexOf(key);
+ if (posn != -1) {
+ posn += key.length();
+ int end = content.indexOf(",", posn);
+ return end == -1 ? content.substring(posn)
+ : content.substring(posn, end);
+ }
+ return null;
+ }
+
+ /**
+ * Try to obtain the actual client's machine according to the current user.
+ * @param ipProxyUsers Users who can override client infos.
+ * @return The actual client's machine.
+ */
+ public static String getClientMachine(final String[] ipProxyUsers) {
+ String cc = clientInfoFromContext(ipProxyUsers);
+ if (cc != null) {
+ // if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
+ // return "1.2.3.4" as the client machine.
+ String key = CallerContext.CLIENT_IP_STR +
+ CallerContext.Builder.KEY_VALUE_SEPARATOR;
+ return parseSpecialValue(cc, key);
+ }
+
+ String clientMachine = Server.getRemoteAddress();
+ if (clientMachine == null) { //not a RPC client
+ clientMachine = "";
+ }
+ return clientMachine;
+ }
+
+ /**
+ * Try to obtain the actual client's id and call id
+ * according to the current user.
+ * @param ipProxyUsers Users who can override client infos
+ * @return The actual client's id and call id.
+ */
+ public static Pair<byte[], Integer> getClientIdAndCallId(
+ final String[] ipProxyUsers) {
+ byte[] clientId = Server.getClientId();
+ int callId = Server.getCallId();
+ String cc = clientInfoFromContext(ipProxyUsers);
+ if (cc != null) {
+ String clientIdKey = CallerContext.CLIENT_ID_STR +
+ CallerContext.Builder.KEY_VALUE_SEPARATOR;
+ String clientIdStr = parseSpecialValue(cc, clientIdKey);
+ if (clientIdStr != null) {
+ clientId = StringUtils.hexStringToByte(clientIdStr);
+ }
+ String callIdKey = CallerContext.CLIENT_CALL_ID_STR +
+ CallerContext.Builder.KEY_VALUE_SEPARATOR;
+ String callIdStr = parseSpecialValue(cc, callIdKey);
+ if (callIdStr != null) {
+ callId = Integer.parseInt(callIdStr);
+ }
+ }
+ return Pair.of(clientId, callId);
+ }
+
/**
* Returns object used for reporting namenode startup progress.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 1d50bc5cb53..b64530337ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -46,8 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.ipc.CallerContext;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -271,7 +270,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
private final String defaultECPolicyName;
- // Users who can override the client ip
+ // Users who can override the client info
private final String[] ipProxyUsers;
public NameNodeRpcServer(Configuration conf, NameNode nn)
@@ -711,8 +710,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if(!nn.isRole(NamenodeRole.NAMENODE))
throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
- null);
+ CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (NamenodeCommand) cacheEntry.getPayload();
}
@@ -725,13 +723,33 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return ret;
}
+ /**
+ * Return the current CacheEntry.
+ */
+ private CacheEntry getCacheEntry() {
+ Pair<byte[], Integer> clientInfo =
+ NameNode.getClientIdAndCallId(this.ipProxyUsers);
+ return RetryCache.waitForCompletion(
+ retryCache, clientInfo.getLeft(), clientInfo.getRight());
+ }
+
+ /**
+ * Return the current CacheEntryWithPayload.
+ */
+ private CacheEntryWithPayload getCacheEntryWithPayload(Object payload) {
+ Pair<byte[], Integer> clientInfo =
+ NameNode.getClientIdAndCallId(this.ipProxyUsers);
+ return RetryCache.waitForCompletion(retryCache, payload,
+ clientInfo.getLeft(), clientInfo.getRight());
+ }
+
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
String operationName = "endCheckpoint";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -801,7 +819,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
+ CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (HdfsFileStatus) cacheEntry.getPayload();
}
@@ -832,8 +850,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
+src+" for "+clientName+" at "+clientMachine);
}
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
- null);
+ CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (LastBlockWithStatus) cacheEntry.getPayload();
}
@@ -999,7 +1016,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -1044,7 +1061,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
@@ -1067,7 +1084,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*DIR* NameNode.concat: src path {} to" +
" target path {}", Arrays.toString(src), trg);
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -1093,7 +1110,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -1130,7 +1147,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
+ ", recursive=" + recursive);
}
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
@@ -1315,7 +1332,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
checkNNStartup();
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return true; // Return previous response
}
@@ -1503,7 +1520,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
public void satisfyStoragePolicy(String src) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -1550,7 +1567,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
boolean createParent) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -1920,34 +1937,11 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
}
+ /**
+ * Get the actual client's machine.
+ */
private String getClientMachine() {
- if (ipProxyUsers != null) {
- // Get the real user (or effective if it isn't a proxy user)
- UserGroupInformation user =
- UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
- if (user != null &&
- ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
- CallerContext context = CallerContext.getCurrent();
- if (context != null && context.isContextValid()) {
- String cc = context.getContext();
- // if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
- // return "1.2.3.4" as the client machine.
- String key = CallerContext.CLIENT_IP_STR +
- CallerContext.Builder.KEY_VALUE_SEPARATOR;
- int posn = cc.indexOf(key);
- if (posn != -1) {
- posn += key.length();
- int end = cc.indexOf(",", posn);
- return end == -1 ? cc.substring(posn) : cc.substring(posn, end);
- }
- }
- }
- }
- String clientMachine = Server.getRemoteAddress();
- if (clientMachine == null) { //not a RPC client
- clientMachine = "";
- }
- return clientMachine;
+ return NameNode.getClientMachine(this.ipProxyUsers);
}
@Override
@@ -1967,8 +1961,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
- null);
+ CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (String) cacheEntry.getPayload();
}
@@ -1995,7 +1988,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrDeleteSnapshotOps();
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -2037,7 +2030,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
namesystem.checkOperation(OperationCategory.WRITE);
metrics.incrRenameSnapshotOps();
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -2098,8 +2091,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
- (retryCache, null);
+ CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (Long) cacheEntry.getPayload();
}
@@ -2120,7 +2112,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@@ -2138,7 +2130,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
public void removeCacheDirective(long id) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@@ -2165,7 +2157,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
public void addCachePool(CachePoolInfo info) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -2182,7 +2174,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
public void modifyCachePool(CachePoolInfo info) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -2199,7 +2191,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
public void removeCachePool(String cachePoolName) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@@ -2262,7 +2254,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@@ -2294,7 +2286,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
final ReencryptAction action) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@@ -2318,7 +2310,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
public void setErasureCodingPolicy(String src, String ecPolicyName)
throws IOException {
checkNNStartup();
- final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@@ -2342,7 +2334,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -2372,7 +2364,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
public void removeXAttr(String src, XAttr xAttr) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return; // Return previous response
}
@@ -2555,7 +2547,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void unsetErasureCodingPolicy(String src) throws IOException {
checkNNStartup();
- final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@@ -2581,8 +2573,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
String operationName = "addErasureCodingPolicies";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
- final CacheEntryWithPayload cacheEntry =
- RetryCache.waitForCompletion(retryCache, null);
+ final CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (AddErasureCodingPolicyResponse[]) cacheEntry.getPayload();
}
@@ -2605,7 +2596,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
String operationName = "removeErasureCodingPolicy";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
- final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@@ -2624,7 +2615,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
String operationName = "enableErasureCodingPolicy";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
- final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
@@ -2643,7 +2634,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
String operationName = "disableErasureCodingPolicy";
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
- final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ final CacheEntry cacheEntry = getCacheEntry();
if (cacheEntry != null && cacheEntry.isSuccess()) {
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org