You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/10/01 16:42:54 UTC

[hadoop] branch branch-3.2 updated: HDFS-14202. dfs.disk.balancer.max.disk.throughputInMBperSec property is not working as per set value. Contributed by Ranith Sardar.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new fc8a7a9  HDFS-14202. dfs.disk.balancer.max.disk.throughputInMBperSec property is not working as per set value. Contributed by Ranith Sardar.
fc8a7a9 is described below

commit fc8a7a9e5b086b1d385fe01207deda000dfcdb57
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Mon Feb 4 11:59:48 2019 -0800

    HDFS-14202. dfs.disk.balancer.max.disk.throughputInMBperSec property is not working as per set value. Contributed by Ranith Sardar.
    
    (cherry picked from commit 0e79a865822eed05f3f8433976b2cfef8f427f25)
---
 .../hadoop/hdfs/server/datanode/DiskBalancer.java  | 21 ++++----
 .../hdfs/server/diskbalancer/TestDiskBalancer.java | 63 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 9 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index 8d5660e..9183344 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -57,9 +57,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 /**
  * Worker class for Disk Balancer.
  * <p>
@@ -864,7 +861,8 @@ public class DiskBalancer {
      * @param item        DiskBalancerWorkItem
      * @return sleep delay in Milliseconds.
      */
-    private long computeDelay(long bytesCopied, long timeUsed,
+    @VisibleForTesting
+    public long computeDelay(long bytesCopied, long timeUsed,
                               DiskBalancerWorkItem item) {
 
       // we had an overflow, ignore this reading and continue.
@@ -872,11 +870,15 @@ public class DiskBalancer {
         return 0;
       }
       final int megaByte = 1024 * 1024;
+      if (bytesCopied < megaByte) {
+        return 0;
+      }
       long bytesInMB = bytesCopied / megaByte;
-      long lastThroughput = bytesInMB / SECONDS.convert(timeUsed,
-          TimeUnit.MILLISECONDS);
-      long delay = (bytesInMB / getDiskBandwidth(item)) - lastThroughput;
-      return (delay <= 0) ? 0 : MILLISECONDS.convert(delay, TimeUnit.SECONDS);
+
+      // converting disk bandwidth in MB/millisec
+      float bandwidth = getDiskBandwidth(item) / 1000f;
+      float delay = ((long) (bytesInMB / bandwidth) - timeUsed);
+      return (delay <= 0) ? 0 : (long) delay;
     }
 
     /**
@@ -1112,7 +1114,8 @@ public class DiskBalancer {
             // to make sure that our promise is good on average.
             // Because we sleep, if a shutdown or cancel call comes in
             // we exit via Thread Interrupted exception.
-            Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item));
+            Thread.sleep(computeDelay(block.getNumBytes(), TimeUnit.NANOSECONDS
+                .toMillis(timeUsed), item));
 
             // We delay updating the info to avoid confusing the user.
             // This way we report the copy only if it is under the
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
index e789694..af17e3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
@@ -222,6 +222,69 @@ public class TestDiskBalancer {
 
   }
 
+  @Test
+  public void testDiskBalancerComputeDelay() throws Exception {
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+
+    final int blockCount = 100;
+    final int blockSize = 11 * 1024 * 1024;
+    final int diskCount = 2;
+    final int dataNodeCount = 1;
+    final int dataNodeIndex = 0;
+    final long cap = blockSize * 2L * blockCount;
+
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
+
+    final MiniDFSCluster cluster = new ClusterBuilder()
+        .setBlockCount(blockCount).setBlockSize(blockSize)
+        .setDiskCount(diskCount).setNumDatanodes(dataNodeCount).setConf(conf)
+        .setCapacities(new long[] {cap, cap }).build();
+
+    try {
+      DataNode node = cluster.getDataNodes().get(dataNodeIndex);
+
+      final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset());
+      DiskBalancerWorkItem item = Mockito.spy(new DiskBalancerWorkItem());
+      // Mocking bandwidth as 10mb/sec.
+      Mockito.doReturn((long) 10).when(item).getBandwidth();
+
+      doAnswer(new Answer<Object>() {
+        public Object answer(InvocationOnMock invocation) {
+          try {
+            node.getFSDataset().moveBlockAcrossVolumes(
+                (ExtendedBlock) invocation.getArguments()[0],
+                (FsVolumeSpi) invocation.getArguments()[1]);
+          } catch (Exception e) {
+            LOG.error(e.getMessage());
+          }
+          return null;
+        }
+      }).when(fsDatasetSpy).moveBlockAcrossVolumes(any(ExtendedBlock.class),
+          any(FsVolumeSpi.class));
+
+      DiskBalancerMover diskBalancerMover = new DiskBalancerMover(fsDatasetSpy,
+          conf);
+
+      diskBalancerMover.setRunnable();
+
+      // bytesCopied - 20 * 1024 *1024 byteCopied.
+      // timeUsed - 1200 in milliseconds
+      // item - set DiskBalancerWorkItem bandwidth as 10
+      // Expect return sleep delay in Milliseconds. sleep value = bytesCopied /
+      // (1024*1024*bandwidth in MB/milli) - timeUsed;
+      long val = diskBalancerMover.computeDelay(20 * 1024 * 1024, 1200, item);
+      Assert.assertEquals(val, (long) 800);
+    } catch (Exception e) {
+      Assert.fail("Unexpected exception: " + e);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 
   @Test
   public void testDiskBalancerWithFedClusterWithOneNameServiceEmpty() throws


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org