You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/08/28 03:13:54 UTC

git commit: HDFS-6944. Archival Storage: add retry and termination logic for Mover. Contributed by Jing Zhao.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-6584 b7ded466b -> a26aa6bd0


HDFS-6944. Archival Storage: add retry and termination logic for Mover. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a26aa6bd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a26aa6bd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a26aa6bd

Branch: refs/heads/HDFS-6584
Commit: a26aa6bd0716da89853566961390d711511084e3
Parents: b7ded46
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Aug 27 14:20:54 2014 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Aug 27 14:20:54 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/server/balancer/Dispatcher.java |  58 ++++-
 .../hdfs/server/balancer/MovedBlocks.java       |   2 +-
 .../apache/hadoop/hdfs/server/mover/Mover.java  | 152 +++++++-----
 .../hdfs/server/mover/TestStorageMover.java     | 247 +++++++++++++++++++
 4 files changed, 381 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a26aa6bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 41ea1f3..98bd58e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -43,6 +43,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -166,6 +167,10 @@ public class Dispatcher {
     void clear() {
       map.clear();
     }
+
+    public Collection<G> values() {
+      return map.values();
+    }
   }
 
   /** This class keeps track of a scheduled block move */
@@ -306,6 +311,7 @@ public class Dispatcher {
         LOG.info("Successfully moved " + this);
       } catch (IOException e) {
         LOG.warn("Failed to move " + this + ": " + e.getMessage());
+        target.getDDatanode().setHasFailure();
         // Proxy or target may have some issues, delay before using these nodes
         // further in order to avoid a potential storm of "threads quota
         // exceeded" warnings when the dispatcher gets out of sync with work
@@ -366,6 +372,19 @@ public class Dispatcher {
     public DBlock(Block block) {
       super(block);
     }
+
+    @Override
+    public synchronized boolean isLocatedOn(StorageGroup loc) {
+      // currently we only check if replicas are located on the same DataNodes
+      // since we do not have the capability to store two replicas in the same
+      // DataNode even though they are on two different storage types
+      for (StorageGroup existing : locations) {
+        if (existing.getDatanodeInfo().equals(loc.getDatanodeInfo())) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
 
   /** The class represents a desired move. */
@@ -469,6 +488,7 @@ public class Dispatcher {
     protected long delayUntil = 0L;
     /** blocks being moved but not confirmed yet */
     private final List<PendingMove> pendings;
+    private volatile boolean hasFailure = false;
     private final int maxConcurrentMoves;
 
     @Override
@@ -538,6 +558,10 @@ public class Dispatcher {
     synchronized boolean removePendingBlock(PendingMove pendingBlock) {
       return pendings.remove(pendingBlock);
     }
+
+    void setHasFailure() {
+      this.hasFailure = true;
+    }
   }
 
   /** A node that can be the sources of a block move */
@@ -884,7 +908,7 @@ public class Dispatcher {
     }
 
     // wait for all block moving to be done
-    waitForMoveCompletion();
+    waitForMoveCompletion(targets);
 
     return bytesMoved.get() - bytesLastMoved;
   }
@@ -892,23 +916,25 @@ public class Dispatcher {
   /** The sleeping period before checking if block move is completed again */
   static private long blockMoveWaitTime = 30000L;
 
-  /** set the sleeping period for block move completion check */
-  static void setBlockMoveWaitTime(long time) {
-    blockMoveWaitTime = time;
-  }
-
-  /** Wait for all block move confirmations. */
-  private void waitForMoveCompletion() {
+  /**
+   * Wait for all block move confirmations.
+   * @return true if there is failed move execution
+   */
+  public static boolean waitForMoveCompletion(
+      Iterable<? extends StorageGroup> targets) {
+    boolean hasFailure = false;
     for(;;) {
       boolean empty = true;
       for (StorageGroup t : targets) {
         if (!t.getDDatanode().isPendingQEmpty()) {
           empty = false;
           break;
+        } else {
+          hasFailure |= t.getDDatanode().hasFailure;
         }
       }
       if (empty) {
-        return; //all pending queues are empty
+        return hasFailure; // all pending queues are empty
       }
       try {
         Thread.sleep(blockMoveWaitTime);
@@ -919,7 +945,7 @@ public class Dispatcher {
 
   /**
    * Decide if the block is a good candidate to be moved from source to target.
-   * A block is a good candidate if 
+   * A block is a good candidate if
    * 1. the block is not in the process of being moved/has not been moved;
    * 2. the block does not have a replica on the target;
    * 3. doing the move does not reduce the number of racks that the block has
@@ -986,7 +1012,7 @@ public class Dispatcher {
    * Check if there are any replica (other than source) on the same node group
    * with target. If true, then target is not a good candidate for placing
    * specific replica as we don't want 2 replicas under the same nodegroup.
-   * 
+   *
    * @return true if there are any replica (other than source) on the same node
    *         group with target
    */
@@ -1011,9 +1037,17 @@ public class Dispatcher {
     movedBlocks.cleanup();
   }
 
+  /** set the sleeping period for block move completion check */
+  @VisibleForTesting
+  public static void setBlockMoveWaitTime(long time) {
+    blockMoveWaitTime = time;
+  }
+
   /** shutdown thread pools */
   public void shutdownNow() {
-    dispatchExecutor.shutdownNow();
+    if (dispatchExecutor != null) {
+      dispatchExecutor.shutdownNow();
+    }
     moveExecutor.shutdownNow();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a26aa6bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
index 557bfd3..18b9cd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
@@ -40,7 +40,7 @@ public class MovedBlocks<L> {
   public static class Locations<L> {
     private final Block block; // the block
     /** The locations of the replicas of the block. */
-    private final List<L> locations = new ArrayList<L>(3);
+    protected final List<L> locations = new ArrayList<L>(3);
     
     public Locations(Block block) {
       this.block = block;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a26aa6bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 4dbe1d3..2bb1317 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.mover;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -130,9 +131,8 @@ public class Mover {
   private ExitStatus run() {
     try {
       init();
-      new Processor().processNamespace();
-
-      return ExitStatus.IN_PROGRESS;
+      boolean hasRemaining = new Processor().processNamespace();
+      return hasRemaining ? ExitStatus.IN_PROGRESS : ExitStatus.SUCCESS;
     } catch (IllegalArgumentException e) {
       System.out.println(e + ".  Exiting ...");
       return ExitStatus.ILLEGAL_ARGUMENTS;
@@ -223,16 +223,29 @@ public class Mover {
       }
     }
 
-    private void processNamespace() {
+    /**
+     * @return whether there is still remaining migration work for the next
+     *         round
+     */
+    private boolean processNamespace() {
       getSnapshottableDirs();
+      boolean hasRemaining = true;
       try {
-        processDirRecursively("", dfs.getFileInfo("/"));
+        hasRemaining = processDirRecursively("", dfs.getFileInfo("/"));
       } catch (IOException e) {
         LOG.warn("Failed to get root directory status. Ignore and continue.", e);
       }
+      // wait for pending move to finish and retry the failed migration
+      hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values());
+      return hasRemaining;
     }
 
-    private void processChildrenList(String fullPath) {
+    /**
+     * @return whether there is still remaing migration work for the next
+     *         round
+     */
+    private boolean processChildrenList(String fullPath) {
+      boolean hasRemaining = false;
       for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
         final DirectoryListing children;
         try {
@@ -240,124 +253,128 @@ public class Mover {
         } catch(IOException e) {
           LOG.warn("Failed to list directory " + fullPath
               + ". Ignore the directory and continue.", e);
-          return;
+          return hasRemaining;
         }
         if (children == null) {
-          return;
+          return hasRemaining;
         }
         for (HdfsFileStatus child : children.getPartialListing()) {
-          processDirRecursively(fullPath, child);
+          hasRemaining |= processDirRecursively(fullPath, child);
         }
-        if (!children.hasMore()) {
+        if (children.hasMore()) {
           lastReturnedName = children.getLastName();
         } else {
-          return;
+          return hasRemaining;
         }
       }
     }
 
-    private void processDirRecursively(String parent, HdfsFileStatus status) {
+    /** @return whether the migration requires next round */
+    private boolean processDirRecursively(String parent,
+                                          HdfsFileStatus status) {
       String fullPath = status.getFullName(parent);
-      if (status.isSymlink()) {
-        return; //ignore symlinks
-      } else if (status.isDir()) {
+      boolean hasRemaining = false;
+      if (status.isDir()) {
         if (!fullPath.endsWith(Path.SEPARATOR)) {
-          fullPath = fullPath + Path.SEPARATOR; 
+          fullPath = fullPath + Path.SEPARATOR;
         }
 
-        processChildrenList(fullPath);
+        hasRemaining = processChildrenList(fullPath);
         // process snapshots if this is a snapshottable directory
         if (snapshottableDirs.contains(fullPath)) {
           final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
-          processChildrenList(dirSnapshot);
+          hasRemaining |= processChildrenList(dirSnapshot);
         }
-      } else { // file
+      } else if (!status.isSymlink()) { // file
         try {
-          if (isSnapshotPathInCurrent(fullPath)) {
+          if (!isSnapshotPathInCurrent(fullPath)) {
             // the full path is a snapshot path but it is also included in the
             // current directory tree, thus ignore it.
-            return;
+            hasRemaining = processFile((HdfsLocatedFileStatus)status);
           }
         } catch (IOException e) {
           LOG.warn("Failed to check the status of " + parent
               + ". Ignore it and continue.", e);
-          return;
+          return false;
         }
-        processFile(parent, (HdfsLocatedFileStatus)status);
       }
+      return hasRemaining;
     }
 
-    private void processFile(String parent, HdfsLocatedFileStatus status) { 
+    /** @return true if it is necessary to run another round of migration */
+    private boolean processFile(HdfsLocatedFileStatus status) {
       final BlockStoragePolicy policy = blockStoragePolicies.getPolicy(
           status.getStoragePolicy());
       final List<StorageType> types = policy.chooseStorageTypes(
           status.getReplication());
 
-      final LocatedBlocks locations = status.getBlockLocations();
-      for(LocatedBlock lb : locations.getLocatedBlocks()) {
-        final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes());
+      final LocatedBlocks locatedBlocks = status.getBlockLocations();
+      boolean hasRemaining = false;
+      for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        final StorageTypeDiff diff = new StorageTypeDiff(types,
+            lb.getStorageTypes());
         if (!diff.removeOverlap()) {
-          scheduleMoves4Block(diff, lb);
+          if (scheduleMoves4Block(diff, lb)) {
+            hasRemaining |= (diff.existing.size() > 1 &&
+                diff.expected.size() > 1);
+          } else {
+            hasRemaining = true;
+          }
         }
       }
+      return hasRemaining;
     }
 
-    void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
+    boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
       final List<MLocation> locations = MLocation.toLocations(lb);
       Collections.shuffle(locations);
       final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
 
-      for(final Iterator<StorageType> i = diff.existing.iterator(); i.hasNext(); ) {
-        final StorageType t = i.next();
-        for(final Iterator<MLocation> j = locations.iterator(); j.hasNext(); ) {
-          final MLocation ml = j.next();
-          final Source source = storages.getSource(ml); 
+      for (final StorageType t : diff.existing) {
+        for (final MLocation ml : locations) {
+          final Source source = storages.getSource(ml);
           if (ml.storageType == t) {
-            // try to schedule replica move.
-            if (scheduleMoveReplica(db, ml, source, diff.expected)) {
-              i.remove();
-              j.remove();
-              return;
+            // try to schedule one replica move.
+            if (scheduleMoveReplica(db, source, diff.expected)) {
+              return true;
             }
           }
         }
       }
+      return false;
     }
 
+    @VisibleForTesting
     boolean scheduleMoveReplica(DBlock db, MLocation ml,
-        List<StorageType> targetTypes) {
-      return scheduleMoveReplica(db, ml, storages.getSource(ml), targetTypes);
+                                List<StorageType> targetTypes) {
+      return scheduleMoveReplica(db, storages.getSource(ml), targetTypes);
     }
 
-    boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source,
+    boolean scheduleMoveReplica(DBlock db, Source source,
         List<StorageType> targetTypes) {
       if (dispatcher.getCluster().isNodeGroupAware()) {
-        if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
+        if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
           return true;
         }
       }
       
       // Then, match nodes on the same rack
-      if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_RACK)) {
+      if (chooseTarget(db, source, targetTypes, Matcher.SAME_RACK)) {
         return true;
       }
       // At last, match all remaining nodes
-      if (chooseTarget(db, ml, source, targetTypes, Matcher.ANY_OTHER)) {
-        return true;
-      }
-      return false;
+      return chooseTarget(db, source, targetTypes, Matcher.ANY_OTHER);
     }
 
-    boolean chooseTarget(DBlock db, MLocation ml, Source source,
+    boolean chooseTarget(DBlock db, Source source,
         List<StorageType> targetTypes, Matcher matcher) {
       final NetworkTopology cluster = dispatcher.getCluster(); 
-      for(final Iterator<StorageType> i = targetTypes.iterator(); i.hasNext(); ) {
-        final StorageType t = i.next();
+      for (StorageType t : targetTypes) {
         for(StorageGroup target : storages.getTargetStorages(t)) {
-          if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())) {
+          if (matcher.match(cluster, source.getDatanodeInfo(),
+              target.getDatanodeInfo())) {
             final PendingMove pm = source.addPendingMove(db, target);
             if (pm != null) {
-              i.remove();
               dispatcher.executePendingMove(pm);
               return true;
             }
@@ -367,7 +384,6 @@ public class Mover {
       return false;
     }
   }
-  
 
   static class MLocation {
     final DatanodeInfo datanode;
@@ -392,7 +408,8 @@ public class Mover {
     }
   }
 
-  private static class StorageTypeDiff {
+  @VisibleForTesting
+  static class StorageTypeDiff {
     final List<StorageType> expected;
     final List<StorageType> existing;
 
@@ -403,7 +420,8 @@ public class Mover {
     
     /**
      * Remove the overlap between the expected types and the existing types.
-     * @return if the existing types is empty after removed the overlap.
+     * @return if the existing types or the expected types is empty after
+     *         removing the overlap.
      */
     boolean removeOverlap() { 
       for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
@@ -412,38 +430,42 @@ public class Mover {
           i.remove();
         }
       }
-      return existing.isEmpty();
+      return expected.isEmpty() || existing.isEmpty();
     }
   }
 
   static int run(Collection<URI> namenodes, Configuration conf)
       throws IOException, InterruptedException {
-    final long sleeptime = 2000*conf.getLong(
+    final long sleeptime = 2000 * conf.getLong(
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
         DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
     LOG.info("namenodes = " + namenodes);
     
     List<NameNodeConnector> connectors = Collections.emptyList();
     try {
-      connectors = NameNodeConnector.newNameNodeConnectors(namenodes, 
+      connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
             Mover.class.getSimpleName(), MOVER_ID_PATH, conf);
     
-      while (true) {
+      while (connectors.size() > 0) {
         Collections.shuffle(connectors);
-        for(NameNodeConnector nnc : connectors) {
+        Iterator<NameNodeConnector> iter = connectors.iterator();
+        while (iter.hasNext()) {
+          NameNodeConnector nnc = iter.next();
           final Mover m = new Mover(nnc, conf);
           final ExitStatus r = m.run();
 
-          if (r != ExitStatus.IN_PROGRESS) {
-            //must be an error statue, return.
+          if (r == ExitStatus.SUCCESS) {
+            iter.remove();
+          } else if (r != ExitStatus.IN_PROGRESS) {
+            // must be an error statue, return
             return r.getExitCode();
           }
         }
-
         Thread.sleep(sleeptime);
       }
+      return ExitStatus.SUCCESS.getExitCode();
     } finally {
-      for(NameNodeConnector nnc : connectors) {
+      for (NameNodeConnector nnc : connectors) {
         IOUtils.cleanup(LOG, nnc);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a26aa6bd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
new file mode 100644
index 0000000..d2a7fcc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.mover;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
+import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.*;
+
+/**
+ * Test the data migration tool (for Archival Storage)
+ */
+public class TestStorageMover {
+  private static final long BLOCK_SIZE = 1024;
+  private static final short REPL = 3;
+  private static final int NUM_DATANODES = 6;
+  private static final Configuration DEFAULT_CONF = new HdfsConfiguration();
+  private static final BlockStoragePolicy.Suite DEFAULT_POLICIES;
+  private static final BlockStoragePolicy HOT;
+  private static final BlockStoragePolicy WARM;
+  private static final BlockStoragePolicy COLD;
+
+  static {
+    DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(new
+        HdfsConfiguration());
+    HOT = DEFAULT_POLICIES.getPolicy("HOT");
+    WARM = DEFAULT_POLICIES.getPolicy("WARM");
+    COLD = DEFAULT_POLICIES.getPolicy("COLD");
+    Dispatcher.setBlockMoveWaitTime(10 * 1000);
+  }
+
+  /**
+   * This scheme defines files/directories and their block storage policies. It
+   * also defines snapshots.
+   */
+  static class NamespaceScheme {
+    final List<Path> files;
+    final Map<Path, List<String>> snapshotMap;
+    final Map<Path, BlockStoragePolicy> policyMap;
+
+    NamespaceScheme(List<Path> files, Map<Path,List<String>> snapshotMap,
+                    Map<Path, BlockStoragePolicy> policyMap) {
+      this.files = files;
+      this.snapshotMap = snapshotMap == null ?
+          new HashMap<Path, List<String>>() : snapshotMap;
+      this.policyMap = policyMap;
+    }
+  }
+
+  /**
+   * This scheme defines DataNodes and their storage, including storage types
+   * and remaining capacities.
+   */
+  static class ClusterScheme {
+    final Configuration conf;
+    final int numDataNodes;
+    final short repl;
+    final StorageType[][] storageTypes;
+    final long[][] storageCapacities;
+
+    ClusterScheme(Configuration conf, int numDataNodes, short repl,
+        StorageType[][] types, long[][] capacities) {
+      Preconditions.checkArgument(types == null || types.length == numDataNodes);
+      Preconditions.checkArgument(capacities == null || capacities.length ==
+          numDataNodes);
+      this.conf = conf;
+      this.numDataNodes = numDataNodes;
+      this.repl = repl;
+      this.storageTypes = types;
+      this.storageCapacities = capacities;
+    }
+  }
+
+  class MigrationTest {
+    private final ClusterScheme clusterScheme;
+    private final NamespaceScheme nsScheme;
+    private final Configuration conf;
+
+    private MiniDFSCluster cluster;
+    private DistributedFileSystem dfs;
+    private final BlockStoragePolicy.Suite policies;
+
+    MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme) {
+      this.clusterScheme = cScheme;
+      this.nsScheme = nsScheme;
+      this.conf = clusterScheme.conf;
+      this.policies = BlockStoragePolicy.readBlockStorageSuite(conf);
+    }
+
+    /**
+     * Set up the cluster and start NameNode and DataNodes according to the
+     * corresponding scheme.
+     */
+    void setupCluster() throws Exception {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(clusterScheme
+          .numDataNodes).storageTypes(clusterScheme.storageTypes)
+          .storageCapacities(clusterScheme.storageCapacities).build();
+      cluster.waitActive();
+      dfs = cluster.getFileSystem();
+    }
+
+    void shutdownCluster() throws Exception {
+      IOUtils.cleanup(null, dfs);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+
+    /**
+     * Create files/directories and set their storage policies according to the
+     * corresponding scheme.
+     */
+    void prepareNamespace() throws Exception {
+      for (Path file : nsScheme.files) {
+        DFSTestUtil.createFile(dfs, file, BLOCK_SIZE * 2, clusterScheme.repl,
+            0L);
+      }
+      for (Map.Entry<Path, List<String>> entry : nsScheme.snapshotMap.entrySet()) {
+        for (String snapshot : entry.getValue()) {
+          SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot);
+        }
+      }
+      for (Map.Entry<Path, BlockStoragePolicy> entry : nsScheme.policyMap.entrySet()) {
+        dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName());
+      }
+    }
+
+    /**
+     * Run the migration tool.
+     */
+    void migrate(String... args) throws Exception {
+      runMover();
+    }
+
+    /**
+     * Verify block locations after running the migration tool.
+     */
+    void verify(boolean verifyAll) throws Exception {
+      if (verifyAll) {
+        verifyNamespace();
+      } else {
+        // TODO verify according to the given path list
+
+      }
+    }
+
+    private void runMover() throws Exception {
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+      int result = Mover.run(namenodes, conf);
+      Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
+    }
+
+    private void verifyNamespace() throws Exception {
+      HdfsFileStatus status = dfs.getClient().getFileInfo("/");
+      verifyRecursively(null, status);
+    }
+
+    private void verifyRecursively(final Path parent,
+        final HdfsFileStatus status) throws Exception {
+      if (status.isDir()) {
+        Path fullPath = parent == null ?
+            new Path("/") : status.getFullPath(parent);
+        DirectoryListing children = dfs.getClient().listPaths(
+            fullPath.toString(), HdfsFileStatus.EMPTY_NAME, true);
+        for (HdfsFileStatus child : children.getPartialListing()) {
+          verifyRecursively(fullPath, child);
+        }
+      } else if (!status.isSymlink()) { // is file
+        HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
+        byte policyId = fileStatus.getStoragePolicy();
+        BlockStoragePolicy policy = policies.getPolicy(policyId);
+        final List<StorageType> types = policy.chooseStorageTypes(
+            status.getReplication());
+        for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) {
+          final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
+              lb.getStorageTypes());
+          Assert.assertTrue(diff.removeOverlap());
+        }
+      }
+    }
+  }
+
+  private static StorageType[][] genStorageTypes(int numDataNodes) {
+    StorageType[][] types = new StorageType[numDataNodes][];
+    for (int i = 0; i < types.length; i++) {
+      types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE};
+    }
+    return types;
+  }
+
+  private void runTest(MigrationTest test) throws Exception {
+    test.setupCluster();
+    try {
+      test.prepareNamespace();
+      test.migrate();
+      Thread.sleep(5000); // let the NN finish deletion
+      test.verify(true);
+    } finally {
+      test.shutdownCluster();
+    }
+  }
+
+  /**
+   * A normal case for Mover: move a file into archival storage
+   */
+  @Test
+  public void testMigrateFileToArchival() throws Exception {
+    final Path foo = new Path("/foo");
+    Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
+    policyMap.put(foo, COLD);
+    NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo), null,
+        policyMap);
+    ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
+        NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
+    MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
+    runTest(test);
+  }
+}