You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2021/04/23 03:58:50 UTC

[hadoop] branch branch-3.3 updated: HDFS-15989. Split TestBalancer and De-flake testMaxIterationTime() (#2942)

This is an automated email from the ASF dual-hosted git repository.

tasanuma pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 96a1dfa  HDFS-15989. Split TestBalancer and De-flake testMaxIterationTime() (#2942)
96a1dfa is described below

commit 96a1dfa313f76e58d70aab0e8d3cf5926021c489
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Fri Apr 23 09:27:23 2021 +0530

    HDFS-15989. Split TestBalancer and De-flake testMaxIterationTime() (#2942)
    
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
    Signed-off-by: Takanobu Asanuma <ta...@apache.org>
---
 .../hadoop/hdfs/server/balancer/TestBalancer.java  | 498 ----------------
 .../balancer/TestBalancerLongRunningTasks.java     | 627 +++++++++++++++++++++
 2 files changed, 627 insertions(+), 498 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index f44bbb2..f253b1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.balancer;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
 import static org.apache.hadoop.fs.StorageType.DEFAULT;
-import static org.apache.hadoop.fs.StorageType.RAM_DISK;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
@@ -28,22 +27,16 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KERBEROS_PRINCIP
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
-import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
 
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.junit.AfterClass;
@@ -56,10 +49,8 @@ import static org.mockito.Mockito.doAnswer;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -86,8 +77,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -107,14 +96,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@@ -185,7 +169,6 @@ public class TestBalancer {
   static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
   static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
   static final int DEFAULT_BLOCK_SIZE = 100;
-  static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
   private static final Random r = new Random();
 
   static {
@@ -211,20 +194,6 @@ public class TestBalancer {
     conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
   }
 
-  static void initConfWithRamDisk(Configuration conf,
-                                  long ramDiskCapacity) {
-    conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
-    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity);
-    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
-    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
-    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
-    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
-    conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
-    LazyPersistTestCase.initCacheManipulator();
-
-    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
-  }
-
   private final ErasureCodingPolicy ecPolicy =
       StripedFileTestUtil.getDefaultECPolicy();
   private final int dataBlocks = ecPolicy.getNumDataUnits();
@@ -486,160 +455,6 @@ public class TestBalancer {
   }
 
   /**
-   * Make sure that balancer can't move pinned blocks.
-   * If specified favoredNodes when create file, blocks will be pinned use
-   * sticky bit.
-   * @throws Exception
-   */
-  @Test(timeout=100000)
-  public void testBalancerWithPinnedBlocks() throws Exception {
-    // This test assumes stick-bit based block pin mechanism available only
-    // in Linux/Unix. It can be unblocked on Windows when HDFS-7759 is ready to
-    // provide a different mechanism for Windows.
-    assumeNotWindows();
-
-    final Configuration conf = new HdfsConfiguration();
-    initConf(conf);
-    conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
-
-    long[] capacities =  new long[] { CAPACITY, CAPACITY };
-    String[] hosts = {"host0", "host1"};
-    String[] racks = { RACK0, RACK1 };
-    int numOfDatanodes = capacities.length;
-
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
-        .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
-
-    cluster.waitActive();
-    client = NameNodeProxies.createProxy(conf,
-        cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
-
-    // fill up the cluster to be 80% full
-    long totalCapacity = sum(capacities);
-    long totalUsedSpace = totalCapacity * 8 / 10;
-    InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
-    for (int i = 0; i < favoredNodes.length; i++) {
-      // DFSClient will attempt reverse lookup. In case it resolves
-      // "127.0.0.1" to "localhost", we manually specify the hostname.
-      int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
-      favoredNodes[i] = new InetSocketAddress(hosts[i], port);
-    }
-
-    DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
-        totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
-        (short) numOfDatanodes, 0, false, favoredNodes);
-
-    // start up an empty node with the same capacity
-    cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
-        new long[] { CAPACITY });
-
-    totalCapacity += CAPACITY;
-
-    // run balancer and validate results
-    waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
-
-    // start rebalancing
-    Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-    int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
-    assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
-  }
-
-  /**
-   * Verify balancer won't violate the default block placement policy.
-   * @throws Exception
-   */
-  @Test(timeout=100000)
-  public void testRackPolicyAfterBalance() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    initConf(conf);
-    long[] capacities =  new long[] { CAPACITY, CAPACITY };
-    String[] hosts = {"host0", "host1"};
-    String[] racks = { RACK0, RACK1 };
-    runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
-        null, CAPACITY, "host2", RACK1, null);
-  }
-
-  /**
-   * Verify balancer won't violate upgrade domain block placement policy.
-   * @throws Exception
-   */
-  @Test(timeout=100000)
-  public void testUpgradeDomainPolicyAfterBalance() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    initConf(conf);
-    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
-        BlockPlacementPolicyWithUpgradeDomain.class,
-        BlockPlacementPolicy.class);
-    long[] capacities =  new long[] { CAPACITY, CAPACITY, CAPACITY };
-    String[] hosts = {"host0", "host1", "host2"};
-    String[] racks = { RACK0, RACK1, RACK1 };
-    String[] UDs = { "ud0", "ud1", "ud2" };
-    runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
-        UDs, CAPACITY, "host3", RACK2, "ud2");
-  }
-
-  private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf,
-      long[] capacities, String[] hosts, String[] racks, String[] UDs,
-      long newCapacity, String newHost, String newRack, String newUD)
-          throws Exception {
-    int numOfDatanodes = capacities.length;
-
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
-        .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
-    DatanodeManager dm = cluster.getNamesystem().getBlockManager().
-        getDatanodeManager();
-    if (UDs != null) {
-      for(int i = 0; i < UDs.length; i++) {
-        DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId();
-        dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]);
-      }
-    }
-
-    try {
-      cluster.waitActive();
-      client = NameNodeProxies.createProxy(conf,
-          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
-
-      // fill up the cluster to be 80% full
-      long totalCapacity = sum(capacities);
-      long totalUsedSpace = totalCapacity * 8 / 10;
-
-      final long fileSize = totalUsedSpace / numOfDatanodes;
-      DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
-          fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false);
-
-      // start up an empty node with the same capacity on the same rack as the
-      // pinned host.
-      cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
-          new String[] { newHost }, new long[] { newCapacity });
-      if (newUD != null) {
-        DatanodeID newId = cluster.getDataNodes().get(
-            numOfDatanodes).getDatanodeId();
-        dm.getDatanode(newId).setUpgradeDomain(newUD);
-      }
-      totalCapacity += newCapacity;
-
-      // run balancer and validate results
-      waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
-
-      // start rebalancing
-      Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-      Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
-      BlockPlacementPolicy placementPolicy =
-          cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
-      List<LocatedBlock> locatedBlocks = client.
-          getBlockLocations(fileName, 0, fileSize).getLocatedBlocks();
-      for (LocatedBlock locatedBlock : locatedBlocks) {
-        BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
-            locatedBlock.getLocations(), numOfDatanodes);
-        assertTrue(status.isPlacementPolicySatisfied());
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  /**
    * Wait until balanced: each datanode gives utilization within
    * BALANCE_ALLOWED_VARIANCE of average
    * @throws IOException
@@ -1598,144 +1413,6 @@ public class TestBalancer {
         CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
   }
 
-
-  @Test(timeout = 100000)
-  public void testMaxIterationTime() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    initConf(conf);
-    int blockSize = 10*1024*1024; // 10MB block size
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
-    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
-    // limit the worker thread count of Balancer to have only 1 queue per DN
-    conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1);
-    // limit the bandwidth to 4MB per sec to emulate slow block moves
-    conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
-        4 * 1024 * 1024);
-    // set client socket timeout to have an IN_PROGRESS notification back from
-    // the DataNode about the copy in every second.
-    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L);
-    // set max iteration time to 2 seconds to timeout before moving any block
-    conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 2000L);
-    // setup the cluster
-    final long capacity = 10L * blockSize;
-    final long[] dnCapacities = new long[] {capacity, capacity};
-    final short rep = 1;
-    final long seed = 0xFAFAFA;
-    cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(0)
-        .build();
-    try {
-      cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
-      conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
-      cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
-      cluster.waitClusterUp();
-      cluster.waitActive();
-      final Path path = new Path("/testMaxIterationTime.dat");
-      DistributedFileSystem fs = cluster.getFileSystem();
-      // fill the DN to 40%
-      DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed);
-      // start a new DN
-      cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
-      cluster.triggerHeartbeats();
-      // setup Balancer and run one iteration
-      List<NameNodeConnector> connectors = Collections.emptyList();
-      try {
-        BalancerParameters bParams = BalancerParameters.DEFAULT;
-        // set maxIdleIterations to 1 for NO_MOVE_PROGRESS to be
-        // reported when there is no block move
-        connectors = NameNodeConnector.newNameNodeConnectors(
-            DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(),
-            Balancer.BALANCER_ID_PATH, conf, 1);
-        for (NameNodeConnector nnc : connectors) {
-          LOG.info("NNC to work on: " + nnc);
-          Balancer b = new Balancer(nnc, bParams, conf);
-          Result r = b.runOneIteration();
-          // Since no block cannot be moved in 2 seconds (i.e.,
-          // 4MB/s * 2s = 8MB < 10MB), NO_MOVE_PROGRESS will be reported.
-          // When a block move is not canceled in 2 seconds properly and then
-          // a block is moved unexpectedly, IN_PROGRESS will be reported.
-          assertEquals("We expect ExitStatus.NO_MOVE_PROGRESS to be reported.",
-              ExitStatus.NO_MOVE_PROGRESS, r.getExitStatus());
-          assertEquals(0, r.getBlocksMoved());
-        }
-      } finally {
-        for (NameNodeConnector nnc : connectors) {
-          IOUtils.cleanupWithLogger(null, nnc);
-        }
-      }
-    } finally {
-      cluster.shutdown(true, true);
-    }
-  }
-
-  /*
-   * Test Balancer with Ram_Disk configured
-   * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
-   * Then verify that the balancer does not migrate files on RAM_DISK across DN.
-   */
-  @Test(timeout=300000)
-  public void testBalancerWithRamDisk() throws Exception {
-    final int SEED = 0xFADED;
-    final short REPL_FACT = 1;
-    Configuration conf = new Configuration();
-
-    final int defaultRamDiskCapacity = 10;
-    final long ramDiskStorageLimit =
-      ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
-      (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
-    final long diskStorageLimit =
-      ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
-      (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
-
-    initConfWithRamDisk(conf, ramDiskStorageLimit);
-
-    cluster = new MiniDFSCluster
-      .Builder(conf)
-      .numDataNodes(1)
-      .storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit })
-      .storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
-      .build();
-
-    cluster.waitActive();
-    // Create few files on RAM_DISK
-    final String METHOD_NAME = GenericTestUtils.getMethodName();
-    final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-    final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
-
-    DistributedFileSystem fs = cluster.getFileSystem();
-    DFSClient client = fs.getClient();
-    DFSTestUtil.createFile(fs, path1, true,
-      DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
-      DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
-    DFSTestUtil.createFile(fs, path2, true,
-      DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
-      DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
-
-    // Sleep for a short time to allow the lazy writer thread to do its job
-    Thread.sleep(6 * 1000);
-
-    // Add another fresh DN with the same type/capacity without files on RAM_DISK
-    StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
-    long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
-        diskStorageLimit}};
-    cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
-      null, null, storageCapacities, null, false, false, false, null);
-
-    cluster.triggerHeartbeats();
-    Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-
-    // Run Balancer
-    final BalancerParameters p = BalancerParameters.DEFAULT;
-    final int r = Balancer.run(namenodes, p, conf);
-
-    // Validate no RAM_DISK block should be moved
-    assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
-
-    // Verify files are still on RAM_DISK
-    DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
-    DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
-  }
-
   /**
    * Check that the balancer exits when there is an unfinalized upgrade.
    */
@@ -1799,66 +1476,6 @@ public class TestBalancer {
   }
 
   /**
-   * Test special case. Two replicas belong to same block should not in same node.
-   * We have 2 nodes.
-   * We have a block in (DN0,SSD) and (DN1,DISK).
-   * Replica in (DN0,SSD) should not be moved to (DN1,SSD).
-   * Otherwise DN1 has 2 replicas.
-   */
-  @Test(timeout=100000)
-  public void testTwoReplicaShouldNotInSameDN() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-
-    int blockSize = 5 * 1024 * 1024 ;
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
-        1L);
-
-    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
-
-    int numOfDatanodes =2;
-    cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(2)
-        .racks(new String[]{"/default/rack0", "/default/rack0"})
-        .storagesPerDatanode(2)
-        .storageTypes(new StorageType[][]{
-            {StorageType.SSD, StorageType.DISK},
-            {StorageType.SSD, StorageType.DISK}})
-        .storageCapacities(new long[][]{
-            {100 * blockSize, 20 * blockSize},
-            {20 * blockSize, 100 * blockSize}})
-        .build();
-    cluster.waitActive();
-
-    //set "/bar" directory with ONE_SSD storage policy.
-    DistributedFileSystem fs = cluster.getFileSystem();
-    Path barDir = new Path("/bar");
-    fs.mkdir(barDir,new FsPermission((short)777));
-    fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
-
-    // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
-    // and (DN0,SSD) and (DN1,DISK) are about 15% full.
-    long fileLen  = 30 * blockSize;
-    // fooFile has ONE_SSD policy. So
-    // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
-    // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
-    Path fooFile = new Path(barDir, "foo");
-    createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
-    // update space info
-    cluster.triggerHeartbeats();
-
-    BalancerParameters p = BalancerParameters.DEFAULT;
-    Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-    final int r = Balancer.run(namenodes, p, conf);
-
-    // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
-    // already has one. Otherwise DN1 will have 2 replicas.
-    // For same reason, no replicas were moved.
-    assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
-  }
-
-  /**
    * Test running many balancer simultaneously.
    *
    * Case-1: First balancer is running. Now, running second one should get
@@ -1929,121 +1546,6 @@ public class TestBalancer {
         ExitStatus.SUCCESS.getExitCode(), exitCode);
   }
 
-  /** Balancer should not move blocks with size < minBlockSize. */
-  @Test(timeout=60000)
-  public void testMinBlockSizeAndSourceNodes() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    initConf(conf);
-
-    final short replication = 3;
-    final long[] lengths = {10, 10, 10, 10};
-    final long[] capacities = new long[replication];
-    final long totalUsed = capacities.length * sum(lengths);
-    Arrays.fill(capacities, 1000);
-
-    cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(capacities.length)
-        .simulatedCapacities(capacities)
-        .build();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
-    cluster.waitActive();
-    client = NameNodeProxies.createProxy(conf, dfs.getUri(),
-        ClientProtocol.class).getProxy();
-
-    // fill up the cluster to be 80% full
-    for(int i = 0; i < lengths.length; i++) {
-      final long size = lengths[i];
-      final Path p = new Path("/file" + i + "_size" + size);
-      try(OutputStream out = dfs.create(p)) {
-        for(int j = 0; j < size; j++) {
-          out.write(j);
-        }
-      }
-    }
-
-    // start up an empty node with the same capacity
-    cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
-    LOG.info("capacities    = " + Arrays.toString(capacities));
-    LOG.info("totalUsedSpace= " + totalUsed);
-    LOG.info("lengths       = " + Arrays.toString(lengths) + ", #=" + lengths.length);
-    waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
-
-    final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-
-    { // run Balancer with min-block-size=50
-      final BalancerParameters p = Balancer.Cli.parse(new String[] {
-          "-policy", BalancingPolicy.Node.INSTANCE.getName(),
-          "-threshold", "1"
-      });
-      assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
-      assertEquals(p.getThreshold(), 1.0, 0.001);
-
-      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
-      final int r = Balancer.run(namenodes, p, conf);
-      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
-    }
-
-    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
-
-    { // run Balancer with empty nodes as source nodes
-      final Set<String> sourceNodes = new HashSet<>();
-      final List<DataNode> datanodes = cluster.getDataNodes();
-      for(int i = capacities.length; i < datanodes.size(); i++) {
-        sourceNodes.add(datanodes.get(i).getDisplayName());
-      }
-      final BalancerParameters p = Balancer.Cli.parse(new String[] {
-          "-policy", BalancingPolicy.Node.INSTANCE.getName(),
-          "-threshold", "1",
-          "-source", StringUtils.join(sourceNodes, ',')
-      });
-      assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
-      assertEquals(p.getThreshold(), 1.0, 0.001);
-      assertEquals(p.getSourceNodes(), sourceNodes);
-
-      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
-      final int r = Balancer.run(namenodes, p, conf);
-      assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
-    }
-
-    { // run Balancer with a filled node as a source node
-      final Set<String> sourceNodes = new HashSet<>();
-      final List<DataNode> datanodes = cluster.getDataNodes();
-      sourceNodes.add(datanodes.get(0).getDisplayName());
-      final BalancerParameters p = Balancer.Cli.parse(new String[] {
-          "-policy", BalancingPolicy.Node.INSTANCE.getName(),
-          "-threshold", "1",
-          "-source", StringUtils.join(sourceNodes, ',')
-      });
-      assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
-      assertEquals(p.getThreshold(), 1.0, 0.001);
-      assertEquals(p.getSourceNodes(), sourceNodes);
-
-      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
-      final int r = Balancer.run(namenodes, p, conf);
-      assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
-    }
-
-    { // run Balancer with all filled node as source nodes
-      final Set<String> sourceNodes = new HashSet<>();
-      final List<DataNode> datanodes = cluster.getDataNodes();
-      for(int i = 0; i < capacities.length; i++) {
-        sourceNodes.add(datanodes.get(i).getDisplayName());
-      }
-      final BalancerParameters p = Balancer.Cli.parse(new String[] {
-          "-policy", BalancingPolicy.Node.INSTANCE.getName(),
-          "-threshold", "1",
-          "-source", StringUtils.join(sourceNodes, ',')
-      });
-      assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
-      assertEquals(p.getThreshold(), 1.0, 0.001);
-      assertEquals(p.getSourceNodes(), sourceNodes);
-
-      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
-      final int r = Balancer.run(namenodes, p, conf);
-      assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
-    }
-  }
-
   public void integrationTestWithStripedFile(Configuration conf) throws Exception {
     initConfWithStripe(conf);
     doTestBalancerWithStripedFile(conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerLongRunningTasks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerLongRunningTasks.java
new file mode 100644
index 0000000..d1e3f73
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerLongRunningTasks.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.balancer;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.hadoop.fs.StorageType.DEFAULT;
+import static org.apache.hadoop.fs.StorageType.RAM_DISK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
+import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Some long running Balancer tasks.
+ */
+public class TestBalancerLongRunningTasks {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestBalancerLongRunningTasks.class);
+
+  static {
+    GenericTestUtils.setLogLevel(Balancer.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(Dispatcher.LOG, Level.DEBUG);
+  }
+
+  private final static long CAPACITY = 5000L;
+  private final static String RACK0 = "/rack0";
+  private final static String RACK1 = "/rack1";
+  private final static String RACK2 = "/rack2";
+  private final static String FILE_NAME = "/tmp.txt";
+  private final static Path FILE_PATH = new Path(FILE_NAME);
+  private MiniDFSCluster cluster;
+
+  @After
+  public void shutdown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private ClientProtocol client;
+
+  static final int DEFAULT_BLOCK_SIZE = 100;
+  static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
+
+  static {
+    initTestSetup();
+  }
+
+  public static void initTestSetup() {
+    // do not create id file since it occupies the disk space
+    NameNodeConnector.setWrite2IdFile(false);
+  }
+
+  static void initConf(Configuration conf) {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+        1L);
+    SimulatedFSDataset.setFactory(conf);
+
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5 * 1000);
+  }
+
+  static void initConfWithRamDisk(Configuration conf,
+      long ramDiskCapacity) {
+    conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5 * 1000);
+    LazyPersistTestCase.initCacheManipulator();
+
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
+  }
+
+  /**
+   * Test special case. Two replicas belong to same block should not in same
+   * node.
+   * We have 2 nodes.
+   * We have a block in (DN0,SSD) and (DN1,DISK).
+   * Replica in (DN0,SSD) should not be moved to (DN1,SSD).
+   * Otherwise DN1 has 2 replicas.
+   */
+  @Test(timeout = 100000)
+  public void testTwoReplicaShouldNotInSameDN() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+
+    int blockSize = 5 * 1024 * 1024;
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+        1L);
+
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
+
+    int numOfDatanodes = 2;
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2)
+        .racks(new String[]{"/default/rack0", "/default/rack0"})
+        .storagesPerDatanode(2)
+        .storageTypes(new StorageType[][]{
+            {StorageType.SSD, StorageType.DISK},
+            {StorageType.SSD, StorageType.DISK}})
+        .storageCapacities(new long[][]{
+            {100 * blockSize, 20 * blockSize},
+            {20 * blockSize, 100 * blockSize}})
+        .build();
+    cluster.waitActive();
+
+    //set "/bar" directory with ONE_SSD storage policy.
+    DistributedFileSystem fs = cluster.getFileSystem();
+    Path barDir = new Path("/bar");
+    fs.mkdir(barDir, new FsPermission((short) 777));
+    fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+
+    // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
+    // and (DN0,SSD) and (DN1,DISK) are about 15% full.
+    long fileLen = 30 * blockSize;
+    // fooFile has ONE_SSD policy. So
+    // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
+    // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
+    Path fooFile = new Path(barDir, "foo");
+    TestBalancer.createFile(cluster, fooFile, fileLen, (short) numOfDatanodes,
+        0);
+    // update space info
+    cluster.triggerHeartbeats();
+
+    BalancerParameters p = BalancerParameters.DEFAULT;
+    Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+    final int r = Balancer.run(namenodes, p, conf);
+
+    // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
+    // already has one. Otherwise DN1 will have 2 replicas.
+    // For same reason, no replicas were moved.
+    assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+  }
+
+  /*
+   * Test Balancer with Ram_Disk configured
+   * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
+   * Then verify that the balancer does not migrate files on RAM_DISK across DN.
+   */
+  @Test(timeout = 300000)
+  public void testBalancerWithRamDisk() throws Exception {
+    final int seed = 0xFADED;
+    final short replicationFactor = 1;
+    Configuration conf = new Configuration();
+
+    final int defaultRamDiskCapacity = 10;
+    final long ramDiskStorageLimit =
+        ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
+            (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
+    final long diskStorageLimit =
+        ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
+            (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
+
+    initConfWithRamDisk(conf, ramDiskStorageLimit);
+
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .numDataNodes(1)
+        .storageCapacities(new long[]{ramDiskStorageLimit, diskStorageLimit})
+        .storageTypes(new StorageType[]{RAM_DISK, DEFAULT})
+        .build();
+
+    cluster.waitActive();
+    // Create few files on RAM_DISK
+    final String methodName = GenericTestUtils.getMethodName();
+    final Path path1 = new Path("/" + methodName + ".01.dat");
+    final Path path2 = new Path("/" + methodName + ".02.dat");
+
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DFSClient dfsClient = fs.getClient();
+    DFSTestUtil.createFile(fs, path1, true,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, replicationFactor, seed, true);
+    DFSTestUtil.createFile(fs, path2, true,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, replicationFactor, seed, true);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(6 * 1000);
+
+    // Add another fresh DN with the same type/capacity without files on
+    // RAM_DISK
+    StorageType[][] storageTypes = new StorageType[][]{{RAM_DISK, DEFAULT}};
+    long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
+            diskStorageLimit}};
+    cluster.startDataNodes(conf, replicationFactor, storageTypes, true, null,
+        null, null, storageCapacities, null, false, false, false, null);
+
+    cluster.triggerHeartbeats();
+    Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+
+    // Run Balancer
+    final BalancerParameters p = BalancerParameters.DEFAULT;
+    final int r = Balancer.run(namenodes, p, conf);
+
+    // Validate no RAM_DISK block should be moved
+    assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+
+    // Verify files are still on RAM_DISK
+    DFSTestUtil.verifyFileReplicasOnStorageType(fs, dfsClient, path1, RAM_DISK);
+    DFSTestUtil.verifyFileReplicasOnStorageType(fs, dfsClient, path2, RAM_DISK);
+  }
+
+  /**
+   * Balancer should not move blocks with size < minBlockSize.
+   */
+  @Test(timeout = 60000)
+  public void testMinBlockSizeAndSourceNodes() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+
+    final short replication = 3;
+    final long[] lengths = {10, 10, 10, 10};
+    final long[] capacities = new long[replication];
+    final long totalUsed = capacities.length * TestBalancer.sum(lengths);
+    Arrays.fill(capacities, 1000);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(capacities.length)
+        .simulatedCapacities(capacities)
+        .build();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    cluster.waitActive();
+    client = NameNodeProxies.createProxy(conf, dfs.getUri(),
+        ClientProtocol.class).getProxy();
+
+    // fill up the cluster to be 80% full
+    for (int i = 0; i < lengths.length; i++) {
+      final long size = lengths[i];
+      final Path p = new Path("/file" + i + "_size" + size);
+      try (OutputStream out = dfs.create(p)) {
+        for (int j = 0; j < size; j++) {
+          out.write(j);
+        }
+      }
+    }
+
+    // start up an empty node with the same capacity
+    cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
+    LOG.info("capacities    = " + Arrays.toString(capacities));
+    LOG.info("totalUsedSpace= " + totalUsed);
+    LOG.info("lengths       = " + Arrays.toString(lengths) + ", #=" + lengths.length);
+    TestBalancer.waitForHeartBeat(totalUsed,
+        2 * capacities[0] * capacities.length, client, cluster);
+
+    final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+
+    { // run Balancer with min-block-size=50
+      final BalancerParameters p = Balancer.Cli.parse(new String[]{
+          "-policy", BalancingPolicy.Node.INSTANCE.getName(),
+          "-threshold", "1"
+      });
+      assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
+      assertEquals(p.getThreshold(), 1.0, 0.001);
+
+      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
+      final int r = Balancer.run(namenodes, p, conf);
+      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+    }
+
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+
+    { // run Balancer with empty nodes as source nodes
+      final Set<String> sourceNodes = new HashSet<>();
+      final List<DataNode> datanodes = cluster.getDataNodes();
+      for (int i = capacities.length; i < datanodes.size(); i++) {
+        sourceNodes.add(datanodes.get(i).getDisplayName());
+      }
+      final BalancerParameters p = Balancer.Cli.parse(new String[]{
+          "-policy", BalancingPolicy.Node.INSTANCE.getName(),
+          "-threshold", "1",
+          "-source", StringUtils.join(sourceNodes, ',')
+      });
+      assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
+      assertEquals(p.getThreshold(), 1.0, 0.001);
+      assertEquals(p.getSourceNodes(), sourceNodes);
+
+      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
+      final int r = Balancer.run(namenodes, p, conf);
+      assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+    }
+
+    { // run Balancer with a filled node as a source node
+      final Set<String> sourceNodes = new HashSet<>();
+      final List<DataNode> datanodes = cluster.getDataNodes();
+      sourceNodes.add(datanodes.get(0).getDisplayName());
+      final BalancerParameters p = Balancer.Cli.parse(new String[]{
+          "-policy", BalancingPolicy.Node.INSTANCE.getName(),
+          "-threshold", "1",
+          "-source", StringUtils.join(sourceNodes, ',')
+      });
+      assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
+      assertEquals(p.getThreshold(), 1.0, 0.001);
+      assertEquals(p.getSourceNodes(), sourceNodes);
+
+      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+      final int r = Balancer.run(namenodes, p, conf);
+      assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+    }
+
+    { // run Balancer with all filled node as source nodes
+      final Set<String> sourceNodes = new HashSet<>();
+      final List<DataNode> datanodes = cluster.getDataNodes();
+      for (int i = 0; i < capacities.length; i++) {
+        sourceNodes.add(datanodes.get(i).getDisplayName());
+      }
+      final BalancerParameters p = Balancer.Cli.parse(new String[]{
+          "-policy", BalancingPolicy.Node.INSTANCE.getName(),
+          "-threshold", "1",
+          "-source", StringUtils.join(sourceNodes, ',')
+      });
+      assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
+      assertEquals(p.getThreshold(), 1.0, 0.001);
+      assertEquals(p.getSourceNodes(), sourceNodes);
+
+      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+      final int r = Balancer.run(namenodes, p, conf);
+      assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+    }
+  }
+
+  /**
+   * Verify balancer won't violate upgrade domain block placement policy.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 100000)
+  public void testUpgradeDomainPolicyAfterBalance() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        BlockPlacementPolicyWithUpgradeDomain.class,
+        BlockPlacementPolicy.class);
+    long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY};
+    String[] hosts = {"host0", "host1", "host2"};
+    String[] racks = {RACK0, RACK1, RACK1};
+    String[] uds = {"ud0", "ud1", "ud2"};
+    runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
+        uds, CAPACITY, "host3", RACK2, "ud2");
+  }
+
+  /**
+   * Verify balancer won't violate the default block placement policy.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 100000)
+  public void testRackPolicyAfterBalance() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    long[] capacities = new long[]{CAPACITY, CAPACITY};
+    String[] hosts = {"host0", "host1"};
+    String[] racks = {RACK0, RACK1};
+    runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
+        null, CAPACITY, "host2", RACK1, null);
+  }
+
+  private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf,
+      long[] capacities, String[] hosts, String[] racks, String[] UDs,
+      long newCapacity, String newHost, String newRack, String newUD)
+      throws Exception {
+    int numOfDatanodes = capacities.length;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
+        .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
+    DatanodeManager dm = cluster.getNamesystem().getBlockManager().
+        getDatanodeManager();
+    if (UDs != null) {
+      for (int i = 0; i < UDs.length; i++) {
+        DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId();
+        dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]);
+      }
+    }
+
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+
+      // fill up the cluster to be 80% full
+      long totalCapacity = TestBalancer.sum(capacities);
+      long totalUsedSpace = totalCapacity * 8 / 10;
+
+      final long fileSize = totalUsedSpace / numOfDatanodes;
+      DFSTestUtil.createFile(cluster.getFileSystem(0), FILE_PATH, false, 1024,
+          fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false);
+
+      // start up an empty node with the same capacity on the same rack as the
+      // pinned host.
+      cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
+          new String[]{newHost}, new long[]{newCapacity});
+      if (newUD != null) {
+        DatanodeID newId = cluster.getDataNodes().get(
+            numOfDatanodes).getDatanodeId();
+        dm.getDatanode(newId).setUpgradeDomain(newUD);
+      }
+      totalCapacity += newCapacity;
+
+      // run balancer and validate results
+      TestBalancer.waitForHeartBeat(totalUsedSpace,
+          totalCapacity, client, cluster);
+
+      // start rebalancing
+      Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+      Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+      BlockPlacementPolicy placementPolicy =
+          cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
+      List<LocatedBlock> locatedBlocks = client.
+          getBlockLocations(FILE_NAME, 0, fileSize).getLocatedBlocks();
+      for (LocatedBlock locatedBlock : locatedBlocks) {
+        BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
+            locatedBlock.getLocations(), numOfDatanodes);
+        assertTrue(status.isPlacementPolicySatisfied());
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Make sure that balancer can't move pinned blocks.
+   * If specified favoredNodes when create file, blocks will be pinned use
+   * sticky bit.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 100000)
+  public void testBalancerWithPinnedBlocks() throws Exception {
+    // This test assumes stick-bit based block pin mechanism available only
+    // in Linux/Unix. It can be unblocked on Windows when HDFS-7759 is ready to
+    // provide a different mechanism for Windows.
+    assumeNotWindows();
+
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+
+    long[] capacities = new long[]{CAPACITY, CAPACITY};
+    String[] hosts = {"host0", "host1"};
+    String[] racks = {RACK0, RACK1};
+    int numOfDatanodes = capacities.length;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
+        .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
+
+    cluster.waitActive();
+    client = NameNodeProxies.createProxy(conf,
+        cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+
+    // fill up the cluster to be 80% full
+    long totalCapacity = TestBalancer.sum(capacities);
+    long totalUsedSpace = totalCapacity * 8 / 10;
+    InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
+    for (int i = 0; i < favoredNodes.length; i++) {
+      // DFSClient will attempt reverse lookup. In case it resolves
+      // "127.0.0.1" to "localhost", we manually specify the hostname.
+      int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
+      favoredNodes[i] = new InetSocketAddress(hosts[i], port);
+    }
+
+    DFSTestUtil.createFile(cluster.getFileSystem(0), FILE_PATH, false, 1024,
+        totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
+        (short) numOfDatanodes, 0, false, favoredNodes);
+
+    // start up an empty node with the same capacity
+    cluster.startDataNodes(conf, 1, true, null, new String[]{RACK2},
+        new long[]{CAPACITY});
+
+    totalCapacity += CAPACITY;
+
+    // run balancer and validate results
+    TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
+        cluster);
+
+    // start rebalancing
+    Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+    int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+    assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+  }
+
+  @Test(timeout = 100000)
+  public void testMaxIterationTime() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    int blockSize = 10 * 1024 * 1024; // 10MB block size
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
+    // limit the worker thread count of Balancer to have only 1 queue per DN
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1);
+    // limit the bandwidth to 4MB per sec to emulate slow block moves
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+        4 * 1024 * 1024);
+    // set client socket timeout to have an IN_PROGRESS notification back from
+    // the DataNode about the copy in every second.
+    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L);
+    // set max iteration time to 500 ms to timeout before moving any block
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 500L);
+    // setup the cluster
+    final long capacity = 10L * blockSize;
+    final long[] dnCapacities = new long[]{capacity, capacity};
+    final short rep = 1;
+    final long seed = 0xFAFAFA;
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(0)
+        .build();
+    try {
+      cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+      conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+      cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
+      cluster.waitClusterUp();
+      cluster.waitActive();
+      final Path path = new Path("/testMaxIterationTime.dat");
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // fill the DN to 40%
+      DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed);
+      // start a new DN
+      cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
+      cluster.triggerHeartbeats();
+      // setup Balancer and run one iteration
+      List<NameNodeConnector> connectors = Collections.emptyList();
+      try {
+        BalancerParameters bParams = BalancerParameters.DEFAULT;
+        // set maxIdleIterations to 1 for NO_MOVE_PROGRESS to be
+        // reported when there is no block move
+        connectors = NameNodeConnector.newNameNodeConnectors(
+            DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(),
+            Balancer.BALANCER_ID_PATH, conf, 1);
+        for (NameNodeConnector nnc : connectors) {
+          LOG.info("NNC to work on: " + nnc);
+          Balancer b = new Balancer(nnc, bParams, conf);
+          Balancer.Result r = b.runOneIteration();
+          // Since no block can be moved in 500 milli-seconds (i.e.,
+          // 4MB/s * 0.5s = 2MB < 10MB), NO_MOVE_PROGRESS will be reported.
+          // When a block move is not canceled in 500 ms properly
+          // (highly unlikely) and then a block is moved unexpectedly,
+          // IN_PROGRESS will be reported. This is highly unlikely unexpected
+          // case. See HDFS-15989.
+          assertEquals("We expect ExitStatus.NO_MOVE_PROGRESS to be reported.",
+              ExitStatus.NO_MOVE_PROGRESS, r.getExitStatus());
+          assertEquals(0, r.getBlocksMoved());
+        }
+      } finally {
+        for (NameNodeConnector nnc : connectors) {
+          IOUtils.cleanupWithLogger(null, nnc);
+        }
+      }
+    } finally {
+      cluster.shutdown(true, true);
+    }
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org