You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 06:05:54 UTC

[23/49] hadoop git commit: HDFS-9735. DiskBalancer : Refactor moveBlockAcrossStorage to be used by disk balancer. Contributed by Anu Engineer.

HDFS-9735. DiskBalancer : Refactor moveBlockAcrossStorage to be used by disk balancer. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7820737c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7820737c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7820737c

Branch: refs/heads/trunk
Commit: 7820737cfa178d9de1bcbb1e99b9677d70901914
Parents: 0506770
Author: Anu Engineer <ae...@apache.org>
Authored: Mon Apr 11 15:58:06 2016 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:20:24 2016 -0700

----------------------------------------------------------------------
 .../server/datanode/fsdataset/FsDatasetSpi.java | 11 +++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 95 +++++++++++++++-----
 .../datanode/fsdataset/impl/FsVolumeImpl.java   | 13 +++
 .../server/datanode/SimulatedFSDataset.java     |  7 ++
 .../extdataset/ExternalDatasetImpl.java         |  8 ++
 .../diskbalancer/DiskBalancerTestUtil.java      | 62 ++++++++++---
 .../diskbalancer/TestDiskBalancerRPC.java       | 53 +++++++++--
 7 files changed, 210 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7820737c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 277b271..eeab098 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -628,4 +628,15 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * Confirm whether the block is deleting
    */
   boolean isDeletingBlock(String bpid, long blockId);
+
+  /**
+   * Moves a given block from one volume to another volume. This is used by disk
+   * balancer.
+   *
+   * @param block       - ExtendedBlock
+   * @param destination - Destination volume
+   * @return Old replica info
+   */
+  ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
+      FsVolumeSpi destination) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7820737c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index b042297..2b40538 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -947,29 +947,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes());
     }
     try {
-      File oldBlockFile = replicaInfo.getBlockFile();
-      File oldMetaFile = replicaInfo.getMetaFile();
-      FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
-      // Copy files to temp dir first
-      File[] blockFiles = copyBlockFiles(block.getBlockId(),
-          block.getGenerationStamp(), oldMetaFile, oldBlockFile,
-          targetVolume.getTmpDir(block.getBlockPoolId()),
-          replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
-
-      ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
-          replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
-          targetVolume, blockFiles[0].getParentFile(), 0);
-      newReplicaInfo.setNumBytes(blockFiles[1].length());
-      // Finalize the copied files
-      newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
-      synchronized (this) {
-        // Increment numBlocks here as this block moved without knowing to BPS
-        FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
-        volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
-      }
-
-      removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
-          oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
+      moveBlock(block, replicaInfo, volumeRef);
     } finally {
       if (volumeRef != null) {
         volumeRef.close();
@@ -981,6 +959,77 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   /**
+   * Moves a block from a given volume to another.
+   *
+   * @param block       - Extended Block
+   * @param replicaInfo - ReplicaInfo
+   * @param volumeRef   - Volume Ref - Closed by caller.
+   * @return newReplicaInfo
+   * @throws IOException
+   */
+  private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
+                                FsVolumeReference volumeRef) throws
+      IOException {
+    File oldBlockFile = replicaInfo.getBlockFile();
+    File oldMetaFile = replicaInfo.getMetaFile();
+    FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
+    // Copy files to temp dir first
+    File[] blockFiles = copyBlockFiles(block.getBlockId(),
+        block.getGenerationStamp(), oldMetaFile, oldBlockFile,
+        targetVolume.getTmpDir(block.getBlockPoolId()),
+        replicaInfo.isOnTransientStorage(), smallBufferSize, conf);
+
+    ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
+        replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
+        targetVolume, blockFiles[0].getParentFile(), 0);
+    newReplicaInfo.setNumBytes(blockFiles[1].length());
+    // Finalize the copied files
+    newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
+    synchronized (this) {
+      // Increment numBlocks here as this block moved without knowing to BPS
+      FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume();
+      volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks();
+    }
+
+    removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
+        oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
+    return newReplicaInfo;
+  }
+
+  /**
+   * Moves a given block from one volume to another volume. This is used by disk
+   * balancer.
+   *
+   * @param block       - ExtendedBlock
+   * @param destination - Destination volume
+   * @return Old replica info
+   */
+  @Override
+  public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi
+      destination) throws IOException {
+    ReplicaInfo replicaInfo = getReplicaInfo(block);
+    if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
+    }
+
+    FsVolumeReference volumeRef = null;
+
+    synchronized (this) {
+      volumeRef = destination.obtainReference();
+    }
+
+    try {
+      moveBlock(block, replicaInfo, volumeRef);
+    } finally {
+      if (volumeRef != null) {
+        volumeRef.close();
+      }
+    }
+    return replicaInfo;
+  }
+
+  /**
    * Compute and store the checksum for a block file that does not already have
    * its checksum computed.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7820737c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 68e2537..4a446d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -711,6 +711,13 @@ public class FsVolumeImpl implements FsVolumeSpi {
                     actualBlockDir.getPath());
                 continue;
               }
+
+              File blkFile = getBlockFile(bpid, block);
+              File metaFile = FsDatasetUtil.findMetaFile(blkFile);
+              block.setGenerationStamp(
+                  Block.getGenerationStamp(metaFile.getName()));
+              block.setNumBytes(blkFile.length());
+
               LOG.trace("nextBlock({}, {}): advancing to {}",
                   storageID, bpid, block);
               return block;
@@ -732,6 +739,12 @@ public class FsVolumeImpl implements FsVolumeSpi {
       }
     }
 
+    private File getBlockFile(String bpid, ExtendedBlock blk)
+        throws IOException {
+      return new File(DatanodeUtil.idToBlockDir(getFinalizedDir(bpid),
+          blk.getBlockId()).toString() + "/" + blk.getBlockName());
+    }
+
     @Override
     public boolean atEnd() {
       return state.atEnd;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7820737c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 25034c6..24f4a52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1359,5 +1359,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public boolean isDeletingBlock(String bpid, long blockId) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block,
+    FsVolumeSpi destination) throws IOException {
+    return null;
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7820737c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index c872e61..8518ddd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -442,4 +442,12 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   public boolean isDeletingBlock(String bpid, long blockId) {
     return false;
   }
+
+  @Override
+  public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block,
+                                            FsVolumeSpi destination)
+      throws IOException {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7820737c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
index 9613919..43bb184 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hdfs.server.diskbalancer;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
@@ -26,6 +29,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
 import org.apache.hadoop.util.Time;
 
+import java.io.IOException;
 import java.util.Random;
 import java.util.UUID;
 
@@ -53,7 +57,6 @@ public class DiskBalancerTestUtil {
    * Returns a random string.
    *
    * @param length - Number of chars in the string
-   *
    * @return random String
    */
   private String getRandomName(int length) {
@@ -122,7 +125,6 @@ public class DiskBalancerTestUtil {
    * Creates a Random Volume for testing purpose.
    *
    * @param type - StorageType
-   *
    * @return DiskBalancerVolume
    */
   public DiskBalancerVolume createRandomVolume(StorageType type) {
@@ -142,11 +144,9 @@ public class DiskBalancerTestUtil {
   /**
    * Creates a RandomVolumeSet.
    *
-   * @param type -Storage Type
+   * @param type      - Storage Type
    * @param diskCount - How many disks you need.
-   *
    * @return volumeSet
-   *
    * @throws Exception
    */
   public DiskBalancerVolumeSet createRandomVolumeSet(StorageType type,
@@ -168,9 +168,7 @@ public class DiskBalancerTestUtil {
    *
    * @param diskTypes - Storage types needed in the Node
    * @param diskCount - Disk count - that many disks of each type is created
-   *
    * @return DataNode
-   *
    * @throws Exception
    */
   public DiskBalancerDataNode createRandomDataNode(StorageType[] diskTypes,
@@ -195,11 +193,9 @@ public class DiskBalancerTestUtil {
    * Creates a RandomCluster.
    *
    * @param dataNodeCount - How many nodes you need
-   * @param diskTypes - StorageTypes you need in each node
-   * @param diskCount - How many disks you need of each type.
-   *
+   * @param diskTypes     - StorageTypes you need in each node
+   * @param diskCount     - How many disks you need of each type.
    * @return Cluster
-   *
    * @throws Exception
    */
   public DiskBalancerCluster createRandCluster(int dataNodeCount,
@@ -224,4 +220,48 @@ public class DiskBalancerTestUtil {
     return cluster;
   }
 
+  /**
+   * Returns the number of blocks on a volume.
+   *
+   * @param source - Source Volume.
+   * @return Number of Blocks.
+   * @throws IOException
+   */
+  public static int getBlockCount(FsVolumeSpi source) throws IOException {
+    int count = 0;
+    for (String blockPoolID : source.getBlockPoolList()) {
+      FsVolumeSpi.BlockIterator sourceIter =
+          source.newBlockIterator(blockPoolID, "TestDiskBalancerSource");
+      while (!sourceIter.atEnd()) {
+        ExtendedBlock block = sourceIter.nextBlock();
+        if (block != null) {
+          count++;
+        }
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Moves all blocks to the destination volume.
+   *
+   * @param fsDataset - Dataset
+   * @param source    - Source Volume.
+   * @param dest      - Destination Volume.
+   * @throws IOException
+   */
+  public static void moveAllDataToDestVolume(FsDatasetSpi fsDataset,
+    FsVolumeSpi source, FsVolumeSpi dest) throws IOException {
+
+    for (String blockPoolID : source.getBlockPoolList()) {
+      FsVolumeSpi.BlockIterator sourceIter =
+          source.newBlockIterator(blockPoolID, "TestDiskBalancerSource");
+      while (!sourceIter.atEnd()) {
+        ExtendedBlock block = sourceIter.nextBlock();
+        if (block != null) {
+          fsDataset.moveBlockAcrossVolumes(block, dest);
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7820737c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
index 27cd8eb..81a0609 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -19,35 +19,39 @@ package org.apache.hadoop.hdfs.server.diskbalancer;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
-import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException.Result;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
 import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
 import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
-import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
-import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
-import org.hamcrest.*;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.codehaus.jackson.map.ObjectMapper;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Random;
 
 import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
 import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
 import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS;
+import static org.junit.Assert.assertTrue;
 
 public class TestDiskBalancerRPC {
   @Rule
@@ -227,6 +231,45 @@ public class TestDiskBalancerRPC {
     Assert.assertTrue(status.getResult() == NO_PLAN);
   }
 
+  @Test
+  public void testMoveBlockAcrossVolume() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final int DEFAULT_BLOCK_SIZE = 100;
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+    String fileName = "/tmp.txt";
+    Path filePath = new Path(fileName);
+    final int numDatanodes = 1;
+    final int dnIndex = 0;
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numDatanodes).build();
+    FsVolumeImpl source = null;
+    FsVolumeImpl dest = null;
+    try {
+      cluster.waitActive();
+      Random r = new Random();
+      FileSystem fs = cluster.getFileSystem(dnIndex);
+      DFSTestUtil.createFile(fs, filePath, 10 * 1024,
+          (short) 1, r.nextLong());
+      DataNode dnNode = cluster.getDataNodes().get(dnIndex);
+      FsDatasetSpi.FsVolumeReferences refs =
+          dnNode.getFSDataset().getFsVolumeReferences();
+      try {
+        source = (FsVolumeImpl) refs.get(0);
+        dest = (FsVolumeImpl) refs.get(1);
+        DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
+            source, dest);
+        assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
+      } finally {
+        refs.close();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+
   private class RpcTestHelper {
     private NodePlan plan;
     private int planVersion;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org