You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by fe...@apache.org on 2022/07/23 14:19:45 UTC

[hadoop] branch trunk updated: HDFS-15079. RBF: Namenode needs to use the actual client Id and callId when going through RBF proxy. (#4530)

This is an automated email from the ASF dual-hosted git repository.

ferhui pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2c963570517 HDFS-15079. RBF: Namenode needs to use the actual client Id and callId when going through RBF proxy. (#4530)
2c963570517 is described below

commit 2c96357051718a519af7efbe748269977351fa89
Author: xuzq <15...@163.com>
AuthorDate: Sat Jul 23 22:19:37 2022 +0800

    HDFS-15079. RBF: Namenode needs to use the actual client Id and callId when going through RBF proxy. (#4530)
---
 .../hadoop/io/retry/RetryInvocationHandler.java    |   6 +-
 .../java/org/apache/hadoop/ipc/CallerContext.java  |   2 +
 .../java/org/apache/hadoop/ipc/RetryCache.java     |  50 ++++---
 .../java/org/apache/hadoop/ipc/TestRetryCache.java |  23 ++--
 .../server/federation/router/RouterRpcClient.java  |  14 +-
 .../federation/router/TestRouterRetryCache.java    | 144 +++++++++++++++++++++
 .../server/federation/router/TestRouterRpc.java    |  37 ++++++
 .../hadoop/hdfs/server/namenode/FSEditLog.java     |  13 +-
 .../hadoop/hdfs/server/namenode/NameNode.java      |  91 +++++++++++++
 .../hdfs/server/namenode/NameNodeRpcServer.java    | 123 ++++++++----------
 10 files changed, 395 insertions(+), 108 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
index 3960b189665..9707ee388e1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
@@ -46,6 +46,10 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
   public static final Logger LOG = LoggerFactory.getLogger(
       RetryInvocationHandler.class);
 
+  @VisibleForTesting
+  public static final ThreadLocal<Boolean> SET_CALL_ID_FOR_TEST =
+      ThreadLocal.withInitial(() -> true);
+
   static class Call {
     private final Method method;
     private final Object[] args;
@@ -159,7 +163,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
     }
 
     Object invokeMethod() throws Throwable {
-      if (isRpc) {
+      if (isRpc && SET_CALL_ID_FOR_TEST.get()) {
         Client.setCallIdAndRetryCount(callId, counters.retries,
             retryInvocationHandler.asyncCallHandler);
       }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
index dbd9184a2b9..98d7e82c70e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
@@ -47,6 +47,8 @@ public final class CallerContext {
   // field names
   public static final String CLIENT_IP_STR = "clientIp";
   public static final String CLIENT_PORT_STR = "clientPort";
+  public static final String CLIENT_ID_STR = "clientId";
+  public static final String CLIENT_CALL_ID_STR = "clientCallId";
 
   /** The caller context.
    *
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
index 3d64a84bfb4..624cc08ac25 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
@@ -55,14 +55,14 @@ public class RetryCache {
     /**
      * Processing state of the requests.
      */
-    private static byte INPROGRESS = 0;
-    private static byte SUCCESS = 1;
-    private static byte FAILED = 2;
+    private static final byte INPROGRESS = 0;
+    private static final byte SUCCESS = 1;
+    private static final byte FAILED = 2;
 
     private byte state = INPROGRESS;
     
     // Store uuid as two long for better memory utilization
-    private final long clientIdMsb; // Most signficant bytes
+    private final long clientIdMsb; // Most significant bytes
     private final long clientIdLsb; // Least significant bytes
     
     private final int callId;
@@ -140,8 +140,8 @@ public class RetryCache {
     
     @Override
     public String toString() {
-      return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
-          + this.callId + ":" + this.state;
+      return String.format("%s:%s:%s", new UUID(this.clientIdMsb, this.clientIdLsb),
+          this.callId, this.state);
     }
   }
 
@@ -183,7 +183,7 @@ public class RetryCache {
 
   private final LightWeightGSet<CacheEntry, CacheEntry> set;
   private final long expirationTime;
-  private String cacheName;
+  private final String cacheName;
 
   private final ReentrantLock lock = new ReentrantLock();
 
@@ -195,7 +195,7 @@ public class RetryCache {
    */
   public RetryCache(String cacheName, double percentage, long expirationTime) {
     int capacity = LightWeightGSet.computeCapacity(percentage, cacheName);
-    capacity = capacity > MAX_CAPACITY ? capacity : MAX_CAPACITY;
+    capacity = Math.max(capacity, MAX_CAPACITY);
     this.set = new LightWeightCache<CacheEntry, CacheEntry>(capacity, capacity,
         expirationTime, 0);
     this.expirationTime = expirationTime;
@@ -203,11 +203,11 @@ public class RetryCache {
     this.retryCacheMetrics =  RetryCacheMetrics.create(this);
   }
 
-  private static boolean skipRetryCache() {
+  private static boolean skipRetryCache(byte[] clientId, int callId) {
     // Do not track non RPC invocation or RPC requests with
     // invalid callId or clientId in retry cache
-    return !Server.isRpcInvocation() || Server.getCallId() < 0
-        || Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
+    return !Server.isRpcInvocation() || callId < 0
+        || Arrays.equals(clientId, RpcConstants.DUMMY_CLIENT_ID);
   }
 
   public void lock() {
@@ -332,43 +332,51 @@ public class RetryCache {
     retryCacheMetrics.incrCacheUpdated();
   }
 
-  private static CacheEntry newEntry(long expirationTime) {
-    return new CacheEntry(Server.getClientId(), Server.getCallId(),
+  private static CacheEntry newEntry(long expirationTime,
+      byte[] clientId, int callId) {
+    return new CacheEntry(clientId, callId,
         System.nanoTime() + expirationTime);
   }
 
   private static CacheEntryWithPayload newEntry(Object payload,
-      long expirationTime) {
-    return new CacheEntryWithPayload(Server.getClientId(), Server.getCallId(),
+      long expirationTime, byte[] clientId, int callId) {
+    return new CacheEntryWithPayload(clientId, callId,
         payload, System.nanoTime() + expirationTime);
   }
 
   /**
    * Static method that provides null check for retryCache.
    * @param cache input Cache.
+   * @param clientId client id of this request
+   * @param callId client call id of this request
    * @return CacheEntry.
    */
-  public static CacheEntry waitForCompletion(RetryCache cache) {
-    if (skipRetryCache()) {
+  public static CacheEntry waitForCompletion(RetryCache cache,
+      byte[] clientId, int callId) {
+    if (skipRetryCache(clientId, callId)) {
       return null;
     }
     return cache != null ? cache
-        .waitForCompletion(newEntry(cache.expirationTime)) : null;
+        .waitForCompletion(newEntry(cache.expirationTime,
+            clientId, callId)) : null;
   }
 
   /**
    * Static method that provides null check for retryCache.
    * @param cache input cache.
    * @param payload input payload.
+   * @param clientId client id of this request
+   * @param callId client call id of this request
    * @return CacheEntryWithPayload.
    */
   public static CacheEntryWithPayload waitForCompletion(RetryCache cache,
-      Object payload) {
-    if (skipRetryCache()) {
+      Object payload, byte[] clientId, int callId) {
+    if (skipRetryCache(clientId, callId)) {
       return null;
     }
     return (CacheEntryWithPayload) (cache != null ? cache
-        .waitForCompletion(newEntry(payload, cache.expirationTime)) : null);
+        .waitForCompletion(newEntry(payload, cache.expirationTime,
+            clientId, callId)) : null);
   }
 
   public static void setState(CacheEntry e, boolean success) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
index 64607deb908..b789ada5271 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
@@ -50,14 +50,14 @@ public class TestRetryCache {
   static class TestServer {
     AtomicInteger retryCount = new AtomicInteger();
     AtomicInteger operationCount = new AtomicInteger();
-    private RetryCache retryCache = new RetryCache("TestRetryCache", 1,
-        100 * 1000 * 1000 * 1000L);
+    private final RetryCache retryCache = new RetryCache(
+        "TestRetryCache", 1, 100 * 1000 * 1000 * 1000L);
 
     /**
      * A server method implemented using {@link RetryCache}.
      * 
      * @param input is returned back in echo, if {@code success} is true.
-     * @param failureOuput returned on failure, if {@code success} is false.
+     * @param failureOutput returned on failure, if {@code success} is false.
      * @param methodTime time taken by the operation. By passing smaller/larger
      *          value one can simulate an operation that takes short/long time.
      * @param success whether this operation completes successfully or not
@@ -67,7 +67,7 @@ public class TestRetryCache {
     int echo(int input, int failureOutput, long methodTime, boolean success)
         throws InterruptedException {
       CacheEntryWithPayload entry = RetryCache.waitForCompletion(retryCache,
-          null);
+          null, Server.getClientId(), Server.getCallId());
       if (entry != null && entry.isSuccess()) {
         System.out.println("retryCount incremented " + retryCount.get());
         retryCount.incrementAndGet();
@@ -173,16 +173,13 @@ public class TestRetryCache {
     final int failureOutput = input + 1;
     ExecutorService executorService = Executors
         .newFixedThreadPool(numberOfThreads);
-    List<Future<Integer>> list = new ArrayList<Future<Integer>>();
+    List<Future<Integer>> list = new ArrayList<>();
     for (int i = 0; i < numberOfThreads; i++) {
-      Callable<Integer> worker = new Callable<Integer>() {
-        @Override
-        public Integer call() throws Exception {
-          Server.getCurCall().set(call);
-          Assert.assertEquals(Server.getCurCall().get(), call);
-          int randomPause = pause == 0 ? pause : r.nextInt(pause);
-          return testServer.echo(input, failureOutput, randomPause, success);
-        }
+      Callable<Integer> worker = () -> {
+        Server.getCurCall().set(call);
+        Assert.assertEquals(Server.getCurCall().get(), call);
+        int randomPause = pause == 0 ? pause : r.nextInt(pause);
+        return testServer.echo(input, failureOutput, randomPause, success);
       };
       Future<Integer> submit = executorService.submit(worker);
       list.add(submit);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index ff90854ebb7..e90cc5fda41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 import org.eclipse.jetty.util.ajax.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -464,7 +465,7 @@ public class RouterRpcClient {
           + router.getRouterId());
     }
 
-    addClientIpToCallerContext();
+    addClientInfoToCallerContext();
 
     Object ret = null;
     if (rpcMonitor != null) {
@@ -584,12 +585,13 @@ public class RouterRpcClient {
   }
 
   /**
-   * For tracking which is the actual client address.
-   * It adds trace info "clientIp:ip" and "clientPort:port"
+   * For tracking some information about the actual client.
+   * It adds trace info "clientIp:ip", "clientPort:port",
+   * "clientId:id" and "clientCallId:callId"
    * in the caller context, removing the old values if they were
    * already present.
    */
-  private void addClientIpToCallerContext() {
+  private void addClientInfoToCallerContext() {
     CallerContext ctx = CallerContext.getCurrent();
     String origContext = ctx == null ? null : ctx.getContext();
     byte[] origSignature = ctx == null ? null : ctx.getSignature();
@@ -598,6 +600,10 @@ public class RouterRpcClient {
             .append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
             .append(CallerContext.CLIENT_PORT_STR,
                 Integer.toString(Server.getRemotePort()))
+            .append(CallerContext.CLIENT_ID_STR,
+                StringUtils.byteToHexString(Server.getClientId()))
+            .append(CallerContext.CLIENT_CALL_ID_STR,
+                Integer.toString(Server.getCallId()))
             .setSignature(origSignature);
     // Append the original caller context
     if (origContext != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java
new file mode 100644
index 00000000000..46a23549797
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRetryCache.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.retry.RetryInvocationHandler;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestRouterRetryCache {
+  /** Federated HDFS cluster. */
+  private MiniRouterDFSCluster cluster;
+
+  @Before
+  public  void setup() throws Exception {
+    Configuration namenodeConf = new Configuration();
+    namenodeConf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
+    cluster = new MiniRouterDFSCluster(true, 1);
+    cluster.addNamenodeOverrides(namenodeConf);
+
+    // Start NNs and DNs and wait until ready
+    cluster.startCluster();
+
+    // Start routers with only an RPC service
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+
+    // Setup the mount table
+    cluster.installMockLocations();
+
+    // Making one Namenodes active per nameservice
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+      }
+    }
+    cluster.waitActiveNamespaces();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test
+  public void testRetryCache() throws Exception {
+    RetryInvocationHandler.SET_CALL_ID_FOR_TEST.set(false);
+    FileSystem routerFS = cluster.getRandomRouter().getFileSystem();
+    Path testDir = new Path("/target-ns0/testdir");
+    routerFS.mkdirs(testDir);
+    routerFS.setPermission(testDir, FsPermission.getDefault());
+
+    // Run as fake joe to authorize the test
+    UserGroupInformation joe =
+        UserGroupInformation.createUserForTesting("fake_joe",
+            new String[]{"fake_group"});
+    FileSystem joeFS = joe.doAs(
+        (PrivilegedExceptionAction<FileSystem>) () ->
+            FileSystem.newInstance(routerFS.getUri(), routerFS.getConf()));
+
+    Path renameSrc = new Path(testDir, "renameSrc");
+    Path renameDst = new Path(testDir, "renameDst");
+    joeFS.mkdirs(renameSrc);
+
+    assertEquals(HAServiceProtocol.HAServiceState.ACTIVE,
+        cluster.getCluster().getNamesystem(0).getState());
+
+    int callId = Client.nextCallId();
+    Client.setCallIdAndRetryCount(callId, 0, null);
+    assertTrue(joeFS.rename(renameSrc, renameDst));
+
+    Client.setCallIdAndRetryCount(callId, 0, null);
+    assertTrue(joeFS.rename(renameSrc, renameDst));
+
+    String ns0 = cluster.getNameservices().get(0);
+    cluster.switchToStandby(ns0, NAMENODES[0]);
+    cluster.switchToActive(ns0, NAMENODES[1]);
+
+    assertEquals(HAServiceProtocol.HAServiceState.ACTIVE,
+        cluster.getCluster().getNamesystem(1).getState());
+
+    Client.setCallIdAndRetryCount(callId, 0, null);
+    assertTrue(joeFS.rename(renameSrc, renameDst));
+  }
+
+  @Test
+  public void testParseSpecialValue() {
+    String mockContent = "mockContent,clientIp:127.0.0.1," +
+        "clientCallId:12345,clientId:mockClientId";
+    String clientIp = NameNode.parseSpecialValue(mockContent, "clientIp:");
+    assertEquals("127.0.0.1", clientIp);
+
+    String clientCallId = NameNode.parseSpecialValue(
+        mockContent, "clientCallId:");
+    assertEquals("12345", clientCallId);
+
+    String clientId = NameNode.parseSpecialValue(mockContent, "clientId:");
+    assertEquals("mockClientId", clientId);
+
+    String clientRetryNum = NameNode.parseSpecialValue(
+        mockContent, "clientRetryNum:");
+    assertNull(clientRetryNum);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index f71145a4522..31cc18fc882 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -2053,6 +2053,8 @@ public class TestRouterRpc {
     final String logOutput = auditlog.getOutput();
     assertTrue(logOutput.contains("callerContext=clientIp:"));
     assertTrue(logOutput.contains(",clientContext"));
+    assertTrue(logOutput.contains(",clientId"));
+    assertTrue(logOutput.contains(",clientCallId"));
     assertTrue(verifyFileExists(routerFS, dirPath));
   }
 
@@ -2103,6 +2105,41 @@ public class TestRouterRpc {
     assertFalse(auditLog.getOutput().contains("clientPort:1234"));
   }
 
+  @Test
+  public void testAddClientIdAndCallIdToCallerContext() throws IOException {
+    GenericTestUtils.LogCapturer auditLog =
+        GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
+
+    // 1. ClientId and ClientCallId are not set on the client.
+    // Set client context.
+    CallerContext.setCurrent(
+        new CallerContext.Builder("clientContext").build());
+
+    // Create a directory via the router.
+    String dirPath = "/test";
+    routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
+
+    // The audit log should contains "clientId:" and "clientCallId:".
+    assertTrue(auditLog.getOutput().contains("clientId:"));
+    assertTrue(auditLog.getOutput().contains("clientCallId:"));
+    assertTrue(verifyFileExists(routerFS, dirPath));
+    auditLog.clearOutput();
+
+    // 2. ClientId and ClientCallId are set on the client.
+    // Reset client context.
+    CallerContext.setCurrent(
+        new CallerContext.Builder(
+            "clientContext,clientId:mockClientId,clientCallId:4321").build());
+
+    // Create a directory via the router.
+    routerProtocol.getFileInfo(dirPath);
+
+    // The audit log should not contain the original clientId and clientCallId
+    // set by client.
+    assertFalse(auditLog.getOutput().contains("clientId:mockClientId"));
+    assertFalse(auditLog.getOutput().contains("clientCallId:4321"));
+  }
+
   @Test
   public void testContentSummaryWithSnapshot() throws Exception {
     DistributedFileSystem routerDFS = (DistributedFileSystem) routerFS;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 850b2fc5708..5bb6872e588 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
@@ -30,6 +31,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -107,7 +109,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Lists;
 
@@ -195,6 +196,9 @@ public class FSEditLog implements LogsPurgeable {
 
   protected final OpInstanceCache cache = new OpInstanceCache();
 
+  // Users who can override the client ip
+  private final String[] ipProxyUsers;
+
   /**
    * The edit directories that are shared between primary and secondary.
    */
@@ -246,6 +250,7 @@ public class FSEditLog implements LogsPurgeable {
    * @param editsDirs List of journals to use
    */
   FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
+    ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
     isSyncRunning = false;
     this.conf = conf;
     this.storage = storage;
@@ -799,8 +804,10 @@ public class FSEditLog implements LogsPurgeable {
   /** Record the RPC IDs if necessary */
   private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
     if (toLogRpcIds) {
-      op.setRpcClientId(Server.getClientId());
-      op.setRpcCallId(Server.getCallId());
+      Pair<byte[], Integer> clientIdAndCallId =
+          NameNode.getClientIdAndCallId(this.ipProxyUsers);
+      op.setRpcClientId(clientIdAndCallId.getLeft());
+      op.setRpcCallId(clientIdAndCallId.getRight());
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 0610048ccac..63c7721b749 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
 import org.apache.hadoop.util.Preconditions;
 
@@ -494,6 +497,94 @@ public class NameNode extends ReconfigurableBase implements
     return metrics;
   }
 
+  /**
+   * Try to obtain the actual client info according to the current user.
+   * @param ipProxyUsers Users who can override client infos
+   */
+  private static String clientInfoFromContext(
+      final String[] ipProxyUsers) {
+    if (ipProxyUsers != null) {
+      UserGroupInformation user =
+          UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
+      if (user != null &&
+          ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
+        CallerContext context = CallerContext.getCurrent();
+        if (context != null && context.isContextValid()) {
+          return context.getContext();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Try to obtain the value corresponding to the key by parsing the content.
+   * @param content the full content to be parsed.
+   * @param key trying to obtain the value of the key.
+   * @return the value corresponding to the key.
+   */
+  @VisibleForTesting
+  public static String parseSpecialValue(String content, String key) {
+    int posn = content.indexOf(key);
+    if (posn != -1) {
+      posn += key.length();
+      int end = content.indexOf(",", posn);
+      return end == -1 ? content.substring(posn)
+          : content.substring(posn, end);
+    }
+    return null;
+  }
+
+  /**
+   * Try to obtain the actual client's machine according to the current user.
+   * @param ipProxyUsers Users who can override client infos.
+   * @return The actual client's machine.
+   */
+  public static String getClientMachine(final String[] ipProxyUsers) {
+    String cc = clientInfoFromContext(ipProxyUsers);
+    if (cc != null) {
+      // if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
+      // return "1.2.3.4" as the client machine.
+      String key = CallerContext.CLIENT_IP_STR +
+          CallerContext.Builder.KEY_VALUE_SEPARATOR;
+      return parseSpecialValue(cc, key);
+    }
+
+    String clientMachine = Server.getRemoteAddress();
+    if (clientMachine == null) { //not a RPC client
+      clientMachine = "";
+    }
+    return clientMachine;
+  }
+
+  /**
+   * Try to obtain the actual client's id and call id
+   * according to the current user.
+   * @param ipProxyUsers Users who can override client infos
+   * @return The actual client's id and call id.
+   */
+  public static Pair<byte[], Integer> getClientIdAndCallId(
+      final String[] ipProxyUsers) {
+    byte[] clientId = Server.getClientId();
+    int callId = Server.getCallId();
+    String cc = clientInfoFromContext(ipProxyUsers);
+    if (cc != null) {
+      String clientIdKey = CallerContext.CLIENT_ID_STR +
+          CallerContext.Builder.KEY_VALUE_SEPARATOR;
+      String clientIdStr = parseSpecialValue(cc, clientIdKey);
+      if (clientIdStr != null) {
+        clientId = StringUtils.hexStringToByte(clientIdStr);
+      }
+      String callIdKey = CallerContext.CLIENT_CALL_ID_STR +
+          CallerContext.Builder.KEY_VALUE_SEPARATOR;
+      String callIdStr = parseSpecialValue(cc, callIdKey);
+      if (callIdStr != null) {
+        callId = Integer.parseInt(callIdStr);
+      }
+    }
+    return Pair.of(clientId, callId);
+  }
+
   /**
    * Returns object used for reporting namenode startup progress.
    * 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 1d50bc5cb53..b64530337ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -46,8 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.ipc.CallerContext;
+import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -271,7 +270,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
 
   private final String defaultECPolicyName;
 
-  // Users who can override the client ip
+  // Users who can override the client info
   private final String[] ipProxyUsers;
 
   public NameNodeRpcServer(Configuration conf, NameNode nn)
@@ -711,8 +710,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     if(!nn.isRole(NamenodeRole.NAMENODE))
       throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
 
-    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
-      null);
+    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (NamenodeCommand) cacheEntry.getPayload();
     }
@@ -725,13 +723,33 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     return ret;
   }
 
+  /**
+   * Return the current CacheEntry.
+   */
+  private CacheEntry getCacheEntry() {
+    Pair<byte[], Integer> clientInfo =
+        NameNode.getClientIdAndCallId(this.ipProxyUsers);
+    return RetryCache.waitForCompletion(
+        retryCache, clientInfo.getLeft(), clientInfo.getRight());
+  }
+
+  /**
+   * Return the current CacheEntryWithPayload.
+   */
+  private CacheEntryWithPayload getCacheEntryWithPayload(Object payload) {
+    Pair<byte[], Integer> clientInfo =
+        NameNode.getClientIdAndCallId(this.ipProxyUsers);
+    return RetryCache.waitForCompletion(retryCache, payload,
+        clientInfo.getLeft(), clientInfo.getRight());
+  }
+
   @Override // NamenodeProtocol
   public void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
     String operationName = "endCheckpoint";
     checkNNStartup();
     namesystem.checkSuperuserPrivilege(operationName);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -801,7 +819,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
+    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (HdfsFileStatus) cacheEntry.getPayload();
     }
@@ -832,8 +850,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
           +src+" for "+clientName+" at "+clientMachine);
     }
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
-        null);
+    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (LastBlockWithStatus) cacheEntry.getPayload();
     }
@@ -999,7 +1016,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -1044,7 +1061,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return true; // Return previous response
     }
@@ -1067,7 +1084,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     stateChangeLog.debug("*DIR* NameNode.concat: src path {} to" +
         " target path {}", Arrays.toString(src), trg);
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -1093,7 +1110,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -1130,7 +1147,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
           + ", recursive=" + recursive);
     }
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return true; // Return previous response
     }
@@ -1315,7 +1332,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
     checkNNStartup();
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return true; // Return previous response
     }
@@ -1503,7 +1520,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public void satisfyStoragePolicy(String src) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -1550,7 +1567,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       boolean createParent) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -1920,34 +1937,11 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     }
   }
 
+  /**
+   * Get the actual client's machine.
+   */
   private String getClientMachine() {
-    if (ipProxyUsers != null) {
-      // Get the real user (or effective if it isn't a proxy user)
-      UserGroupInformation user =
-          UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
-      if (user != null &&
-          ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
-        CallerContext context = CallerContext.getCurrent();
-        if (context != null && context.isContextValid()) {
-          String cc = context.getContext();
-          // if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
-          // return "1.2.3.4" as the client machine.
-          String key = CallerContext.CLIENT_IP_STR +
-              CallerContext.Builder.KEY_VALUE_SEPARATOR;
-          int posn = cc.indexOf(key);
-          if (posn != -1) {
-            posn += key.length();
-            int end = cc.indexOf(",", posn);
-            return end == -1 ? cc.substring(posn) : cc.substring(posn, end);
-          }
-        }
-      }
-    }
-    String clientMachine = Server.getRemoteAddress();
-    if (clientMachine == null) { //not a RPC client
-      clientMachine = "";
-    }
-    return clientMachine;
+    return NameNode.getClientMachine(this.ipProxyUsers);
   }
 
   @Override
@@ -1967,8 +1961,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
-        null);
+    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (String) cacheEntry.getPayload();
     }
@@ -1995,7 +1988,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     }
     namesystem.checkOperation(OperationCategory.WRITE);
     metrics.incrDeleteSnapshotOps();
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -2037,7 +2030,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     }
     namesystem.checkOperation(OperationCategory.WRITE);
     metrics.incrRenameSnapshotOps();
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -2098,8 +2091,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
-      (retryCache, null);
+    CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (Long) cacheEntry.getPayload();
     }
@@ -2120,7 +2112,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
@@ -2138,7 +2130,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public void removeCacheDirective(long id) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
@@ -2165,7 +2157,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public void addCachePool(CachePoolInfo info) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -2182,7 +2174,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public void modifyCachePool(CachePoolInfo info) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -2199,7 +2191,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public void removeCachePool(String cachePoolName) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
@@ -2262,7 +2254,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    final CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
@@ -2294,7 +2286,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       final ReencryptAction action) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    final CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
@@ -2318,7 +2310,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public void setErasureCodingPolicy(String src, String ecPolicyName)
       throws IOException {
     checkNNStartup();
-    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    final CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
@@ -2342,7 +2334,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -2372,7 +2364,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   public void removeXAttr(String src, XAttr xAttr) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.WRITE);
-    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
     }
@@ -2555,7 +2547,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void unsetErasureCodingPolicy(String src) throws IOException {
     checkNNStartup();
-    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    final CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
@@ -2581,8 +2573,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     String operationName = "addErasureCodingPolicies";
     checkNNStartup();
     namesystem.checkSuperuserPrivilege(operationName);
-    final CacheEntryWithPayload cacheEntry =
-        RetryCache.waitForCompletion(retryCache, null);
+    final CacheEntryWithPayload cacheEntry = getCacheEntryWithPayload(null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (AddErasureCodingPolicyResponse[]) cacheEntry.getPayload();
     }
@@ -2605,7 +2596,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     String operationName = "removeErasureCodingPolicy";
     checkNNStartup();
     namesystem.checkSuperuserPrivilege(operationName);
-    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    final CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
@@ -2624,7 +2615,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     String operationName = "enableErasureCodingPolicy";
     checkNNStartup();
     namesystem.checkSuperuserPrivilege(operationName);
-    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    final CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
@@ -2643,7 +2634,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     String operationName = "disableErasureCodingPolicy";
     checkNNStartup();
     namesystem.checkSuperuserPrivilege(operationName);
-    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    final CacheEntry cacheEntry = getCacheEntry();
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org