You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/11/27 00:47:43 UTC

[hadoop] branch branch-2.10 updated: HDFS-14973. More strictly enforce Balancer/Mover/SPS throttling of getBlocks RPCs to NameNodes. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

xkrogen pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 44285e3  HDFS-14973. More strictly enforce Balancer/Mover/SPS throttling of getBlocks RPCs to NameNodes. Contributed by Erik Krogen.
44285e3 is described below

commit 44285e38826a9554d8dfb62dba21e3e5a37065db
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Fri Nov 8 08:57:14 2019 -0800

    HDFS-14973. More strictly enforce Balancer/Mover/SPS throttling of getBlocks RPCs to NameNodes. Contributed by Erik Krogen.
    
    (cherry picked from b2cc8b6b4a78f31cdd937dc4d1a2255f80c5881e)
    (cherry picked from 60655bfe54e138957ef5bbf480a4541bd83152fd)
    (cherry picked from 17779adb32569bd689a93802512003b6a6816bd4)
    (cherry picked from 403892264b08d68a4060fafa9959fd511864d6e5)
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   8 ++
 .../hadoop/hdfs/server/balancer/Dispatcher.java    |  37 +------
 .../hdfs/server/balancer/NameNodeConnector.java    |  16 +++
 .../org/apache/hadoop/hdfs/util/RateLimiter.java   | 104 ++++++++++++++++++++
 .../src/main/resources/hdfs-default.xml            |  11 ++-
 .../hadoop/hdfs/server/balancer/TestBalancer.java  |  80 ++++++++++-----
 .../hdfs/server/balancer/TestBalancerRPCDelay.java |  28 +++++-
 .../apache/hadoop/hdfs/util/TestRateLimiter.java   | 107 +++++++++++++++++++++
 8 files changed, 332 insertions(+), 59 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 2f47ada..aff6b8b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -498,6 +498,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.datanode.metrics.logger.period.seconds";
   public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
       600;
+  /**
+   * The maximum number of getBlocks RPCs data movement utilities can make to
+   * a NameNode per second. Values <= 0 disable throttling. This affects
+   * anything that uses a NameNodeConnector, i.e., the Balancer, Mover,
+   * and StoragePolicySatisfier.
+   */
+  public static final String  DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY = "dfs.namenode.get-blocks.max-qps";
+  public static final int     DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT = 20;
 
   public static final String  DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
   public static final long    DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 444984b..d574750 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -821,10 +821,8 @@ public class Dispatcher {
      * move tasks or it has received enough blocks from the namenode, or the
      * elapsed time of the iteration has exceeded the max time limit.
      *
-     * @param delay - time to sleep before sending getBlocks. Intended to
-     * disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384.
      */
-    private void dispatchBlocks(long delay) {
+    private void dispatchBlocks() {
       this.blocksToReceive = 2 * getScheduledSize();
       long previousMoveTimestamp = Time.monotonicNow();
       while (getScheduledSize() > 0 && !isIterationOver()
@@ -849,25 +847,15 @@ public class Dispatcher {
         if (shouldFetchMoreBlocks()) {
           // fetch new blocks
           try {
-            if(delay > 0) {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Sleeping " + delay + "  msec.");
-              }
-              Thread.sleep(delay);
-            }
             final long received = getBlockList();
             if (received == 0) {
               return;
             }
             blocksToReceive -= received;
             continue;
-          } catch (InterruptedException ignored) {
-            // nothing to do
           } catch (IOException e) {
             LOG.warn("Exception while getting block list", e);
             return;
-          } finally {
-            delay = 0L;
           }
         } else {
           // jump out of while-loop after the configured timeout.
@@ -1060,12 +1048,6 @@ public class Dispatcher {
   }
 
   /**
-   * The best-effort limit on the number of RPCs per second
-   * the Balancer will send to the NameNode.
-   */
-  final static int BALANCER_NUM_RPC_PER_SEC = 20;
-
-  /**
    * Dispatch block moves for each source. The thread selects blocks to move &
    * sends request to proxy source to initiate block move. The process is flow
    * controlled. Block selection is blocked if there are too many un-confirmed
@@ -1080,12 +1062,7 @@ public class Dispatcher {
     int concurrentThreads = Math.min(sources.size(),
         ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
     assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC);
-      LOG.debug("Balancer concurrent threads = " + concurrentThreads);
-      LOG.debug("Disperse Interval sec = " +
-          concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
-    }
+    LOG.debug("Balancer concurrent dispatcher threads " + concurrentThreads);
 
     // Determine the size of each mover thread pool per target
     int threadsPerTarget = maxMoverThreads/targets.size();
@@ -1105,23 +1082,15 @@ public class Dispatcher {
       LOG.info("Allocating " + threadsPerTarget + " threads per target.");
     }
 
-    long dSec = 0;
     final Iterator<Source> i = sources.iterator();
     for (int j = 0; j < futures.length; j++) {
       final Source s = i.next();
-      final long delay = dSec * 1000;
       futures[j] = dispatchExecutor.submit(new Runnable() {
         @Override
         public void run() {
-          s.dispatchBlocks(delay);
+          s.dispatchBlocks();
         }
       });
-      // Calculate delay in seconds for the next iteration
-      if(j >= concurrentThreads) {
-        dSec = 0;
-      } else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) {
-        dSec++;
-      }
     }
 
     // wait for all dispatcher threads to finish
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index a8f0703..589b53a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.util.RateLimiter;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 
@@ -114,6 +116,7 @@ public class NameNodeConnector implements Closeable {
 
   private final int maxNotChangedIterations;
   private int notChangedIterations = 0;
+  private final RateLimiter getBlocksRateLimiter;
 
   public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
                            List<Path> targetPaths, Configuration conf,
@@ -124,6 +127,16 @@ public class NameNodeConnector implements Closeable {
     this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays
         .asList(new Path("/")) : targetPaths;
     this.maxNotChangedIterations = maxNotChangedIterations;
+    int getBlocksMaxQps = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
+        DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
+    if (getBlocksMaxQps > 0) {
+      LOG.info("getBlocks calls for " + nameNodeUri
+          + " will be rate-limited to " + getBlocksMaxQps + " per second");
+      this.getBlocksRateLimiter = RateLimiter.create(getBlocksMaxQps);
+    } else {
+      this.getBlocksRateLimiter = null;
+    }
 
     this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
         BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
@@ -159,6 +172,9 @@ public class NameNodeConnector implements Closeable {
   /** @return blocks with locations. */
   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
       throws IOException {
+    if (getBlocksRateLimiter != null) {
+      getBlocksRateLimiter.acquire();
+    }
     return namenode.getBlocks(datanode, size);
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RateLimiter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RateLimiter.java
new file mode 100644
index 0000000..c7c5da5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RateLimiter.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.util.Timer;
+
+/**
+ * A rate limiter which loosely emulates the behavior of Guava's RateLimiter
+ * for branches which do not have that class available. The APIs are intended
+ * to match RateLimiter's to avoid code changes in calling classes when
+ * switching to Guava.
+ */
+public class RateLimiter {
+
+  private final Timer timer;
+  /** The period that should elapse between any two operations. */
+  private final long opDispersalPeriodNanos;
+
+  /** The last time an operation completed, in system nanos (not wall time). */
+  private final AtomicLong lastOpTimeNanos;
+
+  public static RateLimiter create(double maxOpsPerSecond) {
+    return new RateLimiter(new Timer(), maxOpsPerSecond);
+  }
+
+  RateLimiter(Timer timer, double maxOpsPerSecond) {
+    this.timer = timer;
+    if (maxOpsPerSecond <= 0) {
+      throw new IllegalArgumentException("RateLimiter max operations per "
+          + "second must be > 0 but was " + maxOpsPerSecond);
+    }
+    opDispersalPeriodNanos =
+        (long) (TimeUnit.SECONDS.toNanos(1) / maxOpsPerSecond);
+    lastOpTimeNanos = new AtomicLong(Long.MIN_VALUE);
+  }
+
+  /**
+   * Attempt to acquire a permit to perform an operation. This will block until
+   * enough time has elapsed since the last operation to perform another. No
+   * fairness is provided; acquisition attempts will be serviced in an arbitrary
+   * order rather than FIFO.
+   *
+   * @return The time, in seconds, it took to acquire a permit.
+   */
+  public double acquire() {
+    boolean interrupted = false;
+    long startTimeNanos = Long.MAX_VALUE;
+    try {
+      while (true) {
+        long currTimeNanos = timer.monotonicNowNanos();
+        startTimeNanos = Math.min(currTimeNanos, startTimeNanos);
+        long lastOpTimeLocal = lastOpTimeNanos.get();
+        long nextAllowedOpTime = lastOpTimeLocal + opDispersalPeriodNanos;
+        if (currTimeNanos >= nextAllowedOpTime) {
+          // enough time has elapsed; attempt to acquire the current permit
+          boolean acquired =
+              lastOpTimeNanos.compareAndSet(lastOpTimeLocal, currTimeNanos);
+          // if the CAS failed, another thread acquired the permit, try again
+          if (acquired) {
+            return (currTimeNanos - startTimeNanos)
+                / ((double) TimeUnit.SECONDS.toNanos(1));
+          }
+        } else {
+          interrupted |= sleep(nextAllowedOpTime - currTimeNanos);
+        }
+      }
+    } finally {
+      if (interrupted) {
+        // allow other levels to be aware of the interrupt
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /** Sleep for some amount of nanoseconds. Returns true iff interrupted. */
+  boolean sleep(long sleepTimeNanos) {
+    long sleepTimeMillis = TimeUnit.NANOSECONDS.toMillis(sleepTimeNanos);
+    try {
+      Thread.sleep(sleepTimeMillis, (int) (sleepTimeNanos
+          - TimeUnit.MILLISECONDS.toNanos(sleepTimeMillis)));
+    } catch (InterruptedException ie) {
+      // swallow and continue, but allow the interrupt to be remembered
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index d5f61e8..9c3ef5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3369,7 +3369,16 @@
     HTTPS port for DataNode.
   </description>
 </property>
-
+<property>
+  <name>dfs.namenode.get-blocks.max-qps</name>
+  <value>20</value>
+  <description>
+    The maximum number of getBlocks RPCs data movement utilities can make to
+    a NameNode per second. Values less than or equal to 0 disable throttling.
+    This affects anything that uses a NameNodeConnector, i.e., the Balancer,
+    Mover, and StoragePolicySatisfier.
+  </description>
+</property>
 <property>
   <name>dfs.balancer.dispatcherThreads</name>
   <value>200</value>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 2979111..c8456b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -70,6 +70,8 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -124,6 +126,7 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -153,6 +156,16 @@ public class TestBalancer {
   private static MiniKdc kdc;
   private static File keytabFile;
   private MiniDFSCluster cluster;
+  private AtomicInteger numGetBlocksCalls;
+  private AtomicLong startGetBlocksTime;
+  private AtomicLong endGetBlocksTime;
+
+  @Before
+  public void setup() {
+    numGetBlocksCalls = new AtomicInteger(0);
+    startGetBlocksTime = new AtomicLong(Long.MAX_VALUE);
+    endGetBlocksTime = new AtomicLong(Long.MIN_VALUE);
+  }
 
   static void initSecureConf(Configuration conf) throws Exception {
     baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
@@ -765,7 +778,7 @@ public class TestBalancer {
       long newCapacity, String newRack, NewNodeInfo nodes,
       boolean useTool, boolean useFile) throws Exception {
     doTest(conf, capacities, racks, newCapacity, newRack, nodes,
-        useTool, useFile, false);
+        useTool, useFile, false, 0.3);
   }
 
   /** This test start a cluster with specified number of nodes,
@@ -784,12 +797,14 @@ public class TestBalancer {
    * @param useFile - if true, the hosts to included or excluded will be stored in a
    *   file and then later read from the file.
    * @param useNamesystemSpy - spy on FSNamesystem if true
+   * @param clusterUtilization - The utilization of the cluster to start, from
+   *                             0.0 to 1.0
    * @throws Exception
    */
   private void doTest(Configuration conf, long[] capacities,
       String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
       boolean useTool, boolean useFile,
-      boolean useNamesystemSpy) throws Exception {
+      boolean useNamesystemSpy, double clusterUtilization) throws Exception {
     LOG.info("capacities = " +  long2String(capacities));
     LOG.info("racks      = " +  Arrays.asList(racks));
     LOG.info("newCapacity= " +  newCapacity);
@@ -819,8 +834,8 @@ public class TestBalancer {
 
       long totalCapacity = sum(capacities);
 
-      // fill up the cluster to be 30% full
-      long totalUsedSpace = totalCapacity*3/10;
+      // fill up the cluster to be `clusterUtilization` full
+      long totalUsedSpace = (long) (totalCapacity * clusterUtilization);
       createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
           (short) numOfDatanodes, 0);
 
@@ -1938,33 +1953,48 @@ public class TestBalancer {
     }
   }
 
-  private static int numGetBlocksCalls;
-  private static long startGetBlocksTime, endGetBlocksTime;
-
   private void spyFSNamesystem(NameNode nn) throws IOException {
     FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
-    numGetBlocksCalls = 0;
-    endGetBlocksTime = startGetBlocksTime = Time.monotonicNow();
     doAnswer(new Answer<BlocksWithLocations>() {
       @Override
       public BlocksWithLocations answer(InvocationOnMock invocation)
           throws Throwable {
+        long startTime = Time.monotonicNow();
+        setAtomicLongToMinMax(startGetBlocksTime, startTime, false);
         BlocksWithLocations blk =
             (BlocksWithLocations)invocation.callRealMethod();
-        endGetBlocksTime = Time.monotonicNow();
-        numGetBlocksCalls++;
+        long endTime = Time.monotonicNow();
+        setAtomicLongToMinMax(endGetBlocksTime, endTime, true);
+        numGetBlocksCalls.incrementAndGet();
         return blk;
       }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong());
   }
 
+  private static void setAtomicLongToMinMax(AtomicLong value, long newVal,
+      boolean useMax) {
+    while (true) {
+      long currVal = value.get();
+      if ((useMax && newVal > currVal) || (!useMax && newVal < currVal)) {
+        if (value.compareAndSet(currVal, newVal)) {
+          return;
+        }
+      } else {
+        return;
+      }
+    }
+  }
+
   /**
    * Test that makes the Balancer to disperse RPCs to the NameNode
-   * in order to avoid NN's RPC queue saturation.
+   * in order to avoid NN's RPC queue saturation. This not marked as @Test
+   * because it is run from {@link TestBalancerRPCDelay}.
    */
-  void testBalancerRPCDelay() throws Exception {
+  void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception {
     final Configuration conf = new HdfsConfiguration();
     initConf(conf);
     conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
+        getBlocksMaxQps);
 
     int numDNs = 20;
     long[] capacities = new long[numDNs];
@@ -1974,16 +2004,22 @@ public class TestBalancer {
       racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
     }
     doTest(conf, capacities, racks, CAPACITY, RACK2,
-        new PortNumberBasedNodes(3, 0, 0), false, false, true);
+        // Use only 1 node and set the starting capacity to 50% to allow the
+        // balancing to complete in only one iteration. This is necessary
+        // because the startGetBlocksTime and endGetBlocksTime measures across
+        // all get block calls, so if two iterations are performed, the duration
+        // also includes the time it took to perform the block move ops in the
+        // first iteration
+        new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5);
     assertTrue("Number of getBlocks should be not less than " +
-        Dispatcher.BALANCER_NUM_RPC_PER_SEC,
-        numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC);
-    long d = 1 + endGetBlocksTime - startGetBlocksTime;
-    LOG.info("Balancer executed " + numGetBlocksCalls
-        + " getBlocks in " + d + " msec.");
-    assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " +
-        Dispatcher.BALANCER_NUM_RPC_PER_SEC,
-        (numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC);
+        getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps);
+    long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get();
+    int durationSec = (int) Math.ceil(durationMs / 1000.0);
+    LOG.info("Balancer executed " + numGetBlocksCalls.get() + " getBlocks in "
+        + durationMs + " msec (round up to " + durationSec + " sec)");
+    long getBlockCallsPerSecond = numGetBlocksCalls.get() / durationSec;
+    assertTrue("Expected balancer getBlocks calls per second <= " +
+        getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
index 960ad25..79c7f87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -25,8 +28,29 @@ import org.junit.Test;
  */
 public class TestBalancerRPCDelay {
 
+  private TestBalancer testBalancer;
+
+  @Before
+  public void setup() {
+    testBalancer = new TestBalancer();
+    testBalancer.setup();
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (testBalancer != null) {
+      testBalancer.shutdown();
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testBalancerRPCDelayQps3() throws Exception {
+    testBalancer.testBalancerRPCDelay(3);
+  }
+
   @Test(timeout=100000)
-  public void testBalancerRPCDelay() throws Exception {
-    new TestBalancer().testBalancerRPCDelay();
+  public void testBalancerRPCDelayQpsDefault() throws Exception {
+    testBalancer.testBalancerRPCDelay(
+        DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestRateLimiter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestRateLimiter.java
new file mode 100644
index 0000000..e1c6af6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestRateLimiter.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/** Tests for {@link RateLimiter}. */
+public class TestRateLimiter {
+
+  // epsilon of 1 ns
+  private static final double EPSILON = 1.0 / TimeUnit.SECONDS.toNanos(1);
+
+  @Test
+  public void testRateLimiter() {
+    FakeTimer timer = new FakeTimer();
+    Queue<Long> sleepTimesNanos = new LinkedList<>();
+    Queue<Long> advanceTimerNanos = new LinkedList<>();
+    RateLimiter limiter =
+        new TestingRateLimiter(timer, 10, sleepTimesNanos, advanceTimerNanos);
+
+    final long nanos100ms = TimeUnit.MILLISECONDS.toNanos(100);
+
+    // should be able to acquire immediately the first time
+    assertEquals(0.0, limiter.acquire(), EPSILON);
+    assertTrue(sleepTimesNanos.isEmpty());
+
+    // 100ms of sleep is required the second time
+    advanceTimerNanos.add(nanos100ms);
+    assertEquals(0.1, limiter.acquire(), EPSILON);
+    assertEquals(1, sleepTimesNanos.size());
+    assertNextValue(sleepTimesNanos, nanos100ms);
+
+    // test when it takes 2 sleep cycles to be able to acquire
+    advanceTimerNanos.add(nanos100ms / 2);
+    advanceTimerNanos.add(nanos100ms / 2);
+    assertEquals(0.1, limiter.acquire(), EPSILON);
+    assertEquals(2, sleepTimesNanos.size());
+    assertNextValue(sleepTimesNanos, nanos100ms);
+    assertNextValue(sleepTimesNanos, nanos100ms / 2);
+
+    // if some time passes between acquisitions, the next should be immediate
+    timer.advanceNanos(nanos100ms * 2);
+    assertEquals(0.0, limiter.acquire(), EPSILON);
+    assertTrue(sleepTimesNanos.isEmpty());
+
+    // the rate limiter has no memory, so although time passed, the next
+    // acquisition is still rate limited
+    advanceTimerNanos.add(nanos100ms);
+    assertEquals(0.1, limiter.acquire(), EPSILON);
+    assertEquals(1, sleepTimesNanos.size());
+    assertNextValue(sleepTimesNanos, nanos100ms);
+  }
+
+  private static void assertNextValue(Queue<Long> queue, long expected) {
+    Long value = queue.poll();
+    assertNotNull(value);
+    assertEquals(expected, value.longValue());
+  }
+
+  private static class TestingRateLimiter extends RateLimiter {
+
+    private final FakeTimer fakeTimer;
+    private final Queue<Long> sleepTimesNanos;
+    private final Queue<Long> advanceTimerNanos;
+
+    TestingRateLimiter(FakeTimer fakeTimer, double maxOpsPerSecond,
+        Queue<Long> sleepTimesNanos, Queue<Long> advanceTimerNanos) {
+      super(fakeTimer, maxOpsPerSecond);
+      this.fakeTimer = fakeTimer;
+      this.sleepTimesNanos = sleepTimesNanos;
+      this.advanceTimerNanos = advanceTimerNanos;
+    }
+
+    @Override
+    boolean sleep(long sleepTimeNanos) {
+      sleepTimesNanos.offer(sleepTimeNanos);
+      Long advanceNanos = advanceTimerNanos.poll();
+      if (advanceNanos == null) {
+        fail("Unexpected sleep; no timer advance value found");
+      }
+      fakeTimer.advanceNanos(advanceNanos);
+      return false;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org