You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/08/28 03:13:54 UTC
git commit: HDFS-6944. Archival Storage: add retry and termination
logic for Mover. Contributed by Jing Zhao.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-6584 b7ded466b -> a26aa6bd0
HDFS-6944. Archival Storage: add retry and termination logic for Mover. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a26aa6bd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a26aa6bd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a26aa6bd
Branch: refs/heads/HDFS-6584
Commit: a26aa6bd0716da89853566961390d711511084e3
Parents: b7ded46
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Aug 27 14:20:54 2014 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Aug 27 14:20:54 2014 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/server/balancer/Dispatcher.java | 58 ++++-
.../hdfs/server/balancer/MovedBlocks.java | 2 +-
.../apache/hadoop/hdfs/server/mover/Mover.java | 152 +++++++-----
.../hdfs/server/mover/TestStorageMover.java | 247 +++++++++++++++++++
4 files changed, 381 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a26aa6bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 41ea1f3..98bd58e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -43,6 +43,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -166,6 +167,10 @@ public class Dispatcher {
void clear() {
map.clear();
}
+
+ public Collection<G> values() {
+ return map.values();
+ }
}
/** This class keeps track of a scheduled block move */
@@ -306,6 +311,7 @@ public class Dispatcher {
LOG.info("Successfully moved " + this);
} catch (IOException e) {
LOG.warn("Failed to move " + this + ": " + e.getMessage());
+ target.getDDatanode().setHasFailure();
// Proxy or target may have some issues, delay before using these nodes
// further in order to avoid a potential storm of "threads quota
// exceeded" warnings when the dispatcher gets out of sync with work
@@ -366,6 +372,19 @@ public class Dispatcher {
public DBlock(Block block) {
super(block);
}
+
+ @Override
+ public synchronized boolean isLocatedOn(StorageGroup loc) {
+ // currently we only check if replicas are located on the same DataNodes
+ // since we do not have the capability to store two replicas in the same
+ // DataNode even though they are on two different storage types
+ for (StorageGroup existing : locations) {
+ if (existing.getDatanodeInfo().equals(loc.getDatanodeInfo())) {
+ return true;
+ }
+ }
+ return false;
+ }
}
/** The class represents a desired move. */
@@ -469,6 +488,7 @@ public class Dispatcher {
protected long delayUntil = 0L;
/** blocks being moved but not confirmed yet */
private final List<PendingMove> pendings;
+ private volatile boolean hasFailure = false;
private final int maxConcurrentMoves;
@Override
@@ -538,6 +558,10 @@ public class Dispatcher {
synchronized boolean removePendingBlock(PendingMove pendingBlock) {
return pendings.remove(pendingBlock);
}
+
+ void setHasFailure() {
+ this.hasFailure = true;
+ }
}
/** A node that can be the sources of a block move */
@@ -884,7 +908,7 @@ public class Dispatcher {
}
// wait for all block moving to be done
- waitForMoveCompletion();
+ waitForMoveCompletion(targets);
return bytesMoved.get() - bytesLastMoved;
}
@@ -892,23 +916,25 @@ public class Dispatcher {
/** The sleeping period before checking if block move is completed again */
static private long blockMoveWaitTime = 30000L;
- /** set the sleeping period for block move completion check */
- static void setBlockMoveWaitTime(long time) {
- blockMoveWaitTime = time;
- }
-
- /** Wait for all block move confirmations. */
- private void waitForMoveCompletion() {
+ /**
+ * Wait for all block move confirmations.
+ * @return true if there is failed move execution
+ */
+ public static boolean waitForMoveCompletion(
+ Iterable<? extends StorageGroup> targets) {
+ boolean hasFailure = false;
for(;;) {
boolean empty = true;
for (StorageGroup t : targets) {
if (!t.getDDatanode().isPendingQEmpty()) {
empty = false;
break;
+ } else {
+ hasFailure |= t.getDDatanode().hasFailure;
}
}
if (empty) {
- return; //all pending queues are empty
+ return hasFailure; // all pending queues are empty
}
try {
Thread.sleep(blockMoveWaitTime);
@@ -919,7 +945,7 @@ public class Dispatcher {
/**
* Decide if the block is a good candidate to be moved from source to target.
- * A block is a good candidate if
+ * A block is a good candidate if
* 1. the block is not in the process of being moved/has not been moved;
* 2. the block does not have a replica on the target;
* 3. doing the move does not reduce the number of racks that the block has
@@ -986,7 +1012,7 @@ public class Dispatcher {
* Check if there are any replica (other than source) on the same node group
* with target. If true, then target is not a good candidate for placing
* specific replica as we don't want 2 replicas under the same nodegroup.
- *
+ *
* @return true if there are any replica (other than source) on the same node
* group with target
*/
@@ -1011,9 +1037,17 @@ public class Dispatcher {
movedBlocks.cleanup();
}
+ /** set the sleeping period for block move completion check */
+ @VisibleForTesting
+ public static void setBlockMoveWaitTime(long time) {
+ blockMoveWaitTime = time;
+ }
+
/** shutdown thread pools */
public void shutdownNow() {
- dispatchExecutor.shutdownNow();
+ if (dispatchExecutor != null) {
+ dispatchExecutor.shutdownNow();
+ }
moveExecutor.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a26aa6bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
index 557bfd3..18b9cd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java
@@ -40,7 +40,7 @@ public class MovedBlocks<L> {
public static class Locations<L> {
private final Block block; // the block
/** The locations of the replicas of the block. */
- private final List<L> locations = new ArrayList<L>(3);
+ protected final List<L> locations = new ArrayList<L>(3);
public Locations(Block block) {
this.block = block;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a26aa6bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 4dbe1d3..2bb1317 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.mover;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -130,9 +131,8 @@ public class Mover {
private ExitStatus run() {
try {
init();
- new Processor().processNamespace();
-
- return ExitStatus.IN_PROGRESS;
+ boolean hasRemaining = new Processor().processNamespace();
+ return hasRemaining ? ExitStatus.IN_PROGRESS : ExitStatus.SUCCESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS;
@@ -223,16 +223,29 @@ public class Mover {
}
}
- private void processNamespace() {
+ /**
+ * @return whether there is still remaining migration work for the next
+ * round
+ */
+ private boolean processNamespace() {
getSnapshottableDirs();
+ boolean hasRemaining = true;
try {
- processDirRecursively("", dfs.getFileInfo("/"));
+ hasRemaining = processDirRecursively("", dfs.getFileInfo("/"));
} catch (IOException e) {
LOG.warn("Failed to get root directory status. Ignore and continue.", e);
}
+ // wait for pending move to finish and retry the failed migration
+ hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values());
+ return hasRemaining;
}
- private void processChildrenList(String fullPath) {
+ /**
+ * @return whether there is still remaing migration work for the next
+ * round
+ */
+ private boolean processChildrenList(String fullPath) {
+ boolean hasRemaining = false;
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
final DirectoryListing children;
try {
@@ -240,124 +253,128 @@ public class Mover {
} catch(IOException e) {
LOG.warn("Failed to list directory " + fullPath
+ ". Ignore the directory and continue.", e);
- return;
+ return hasRemaining;
}
if (children == null) {
- return;
+ return hasRemaining;
}
for (HdfsFileStatus child : children.getPartialListing()) {
- processDirRecursively(fullPath, child);
+ hasRemaining |= processDirRecursively(fullPath, child);
}
- if (!children.hasMore()) {
+ if (children.hasMore()) {
lastReturnedName = children.getLastName();
} else {
- return;
+ return hasRemaining;
}
}
}
- private void processDirRecursively(String parent, HdfsFileStatus status) {
+ /** @return whether the migration requires next round */
+ private boolean processDirRecursively(String parent,
+ HdfsFileStatus status) {
String fullPath = status.getFullName(parent);
- if (status.isSymlink()) {
- return; //ignore symlinks
- } else if (status.isDir()) {
+ boolean hasRemaining = false;
+ if (status.isDir()) {
if (!fullPath.endsWith(Path.SEPARATOR)) {
- fullPath = fullPath + Path.SEPARATOR;
+ fullPath = fullPath + Path.SEPARATOR;
}
- processChildrenList(fullPath);
+ hasRemaining = processChildrenList(fullPath);
// process snapshots if this is a snapshottable directory
if (snapshottableDirs.contains(fullPath)) {
final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
- processChildrenList(dirSnapshot);
+ hasRemaining |= processChildrenList(dirSnapshot);
}
- } else { // file
+ } else if (!status.isSymlink()) { // file
try {
- if (isSnapshotPathInCurrent(fullPath)) {
+ if (!isSnapshotPathInCurrent(fullPath)) {
// the full path is a snapshot path but it is also included in the
// current directory tree, thus ignore it.
- return;
+ hasRemaining = processFile((HdfsLocatedFileStatus)status);
}
} catch (IOException e) {
LOG.warn("Failed to check the status of " + parent
+ ". Ignore it and continue.", e);
- return;
+ return false;
}
- processFile(parent, (HdfsLocatedFileStatus)status);
}
+ return hasRemaining;
}
- private void processFile(String parent, HdfsLocatedFileStatus status) {
+ /** @return true if it is necessary to run another round of migration */
+ private boolean processFile(HdfsLocatedFileStatus status) {
final BlockStoragePolicy policy = blockStoragePolicies.getPolicy(
status.getStoragePolicy());
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
- final LocatedBlocks locations = status.getBlockLocations();
- for(LocatedBlock lb : locations.getLocatedBlocks()) {
- final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes());
+ final LocatedBlocks locatedBlocks = status.getBlockLocations();
+ boolean hasRemaining = false;
+ for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+ final StorageTypeDiff diff = new StorageTypeDiff(types,
+ lb.getStorageTypes());
if (!diff.removeOverlap()) {
- scheduleMoves4Block(diff, lb);
+ if (scheduleMoves4Block(diff, lb)) {
+ hasRemaining |= (diff.existing.size() > 1 &&
+ diff.expected.size() > 1);
+ } else {
+ hasRemaining = true;
+ }
}
}
+ return hasRemaining;
}
- void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
+ boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
final List<MLocation> locations = MLocation.toLocations(lb);
Collections.shuffle(locations);
final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
- for(final Iterator<StorageType> i = diff.existing.iterator(); i.hasNext(); ) {
- final StorageType t = i.next();
- for(final Iterator<MLocation> j = locations.iterator(); j.hasNext(); ) {
- final MLocation ml = j.next();
- final Source source = storages.getSource(ml);
+ for (final StorageType t : diff.existing) {
+ for (final MLocation ml : locations) {
+ final Source source = storages.getSource(ml);
if (ml.storageType == t) {
- // try to schedule replica move.
- if (scheduleMoveReplica(db, ml, source, diff.expected)) {
- i.remove();
- j.remove();
- return;
+ // try to schedule one replica move.
+ if (scheduleMoveReplica(db, source, diff.expected)) {
+ return true;
}
}
}
}
+ return false;
}
+ @VisibleForTesting
boolean scheduleMoveReplica(DBlock db, MLocation ml,
- List<StorageType> targetTypes) {
- return scheduleMoveReplica(db, ml, storages.getSource(ml), targetTypes);
+ List<StorageType> targetTypes) {
+ return scheduleMoveReplica(db, storages.getSource(ml), targetTypes);
}
- boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source,
+ boolean scheduleMoveReplica(DBlock db, Source source,
List<StorageType> targetTypes) {
if (dispatcher.getCluster().isNodeGroupAware()) {
- if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
+ if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
return true;
}
}
// Then, match nodes on the same rack
- if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_RACK)) {
+ if (chooseTarget(db, source, targetTypes, Matcher.SAME_RACK)) {
return true;
}
// At last, match all remaining nodes
- if (chooseTarget(db, ml, source, targetTypes, Matcher.ANY_OTHER)) {
- return true;
- }
- return false;
+ return chooseTarget(db, source, targetTypes, Matcher.ANY_OTHER);
}
- boolean chooseTarget(DBlock db, MLocation ml, Source source,
+ boolean chooseTarget(DBlock db, Source source,
List<StorageType> targetTypes, Matcher matcher) {
final NetworkTopology cluster = dispatcher.getCluster();
- for(final Iterator<StorageType> i = targetTypes.iterator(); i.hasNext(); ) {
- final StorageType t = i.next();
+ for (StorageType t : targetTypes) {
for(StorageGroup target : storages.getTargetStorages(t)) {
- if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())) {
+ if (matcher.match(cluster, source.getDatanodeInfo(),
+ target.getDatanodeInfo())) {
final PendingMove pm = source.addPendingMove(db, target);
if (pm != null) {
- i.remove();
dispatcher.executePendingMove(pm);
return true;
}
@@ -367,7 +384,6 @@ public class Mover {
return false;
}
}
-
static class MLocation {
final DatanodeInfo datanode;
@@ -392,7 +408,8 @@ public class Mover {
}
}
- private static class StorageTypeDiff {
+ @VisibleForTesting
+ static class StorageTypeDiff {
final List<StorageType> expected;
final List<StorageType> existing;
@@ -403,7 +420,8 @@ public class Mover {
/**
* Remove the overlap between the expected types and the existing types.
- * @return if the existing types is empty after removed the overlap.
+ * @return if the existing types or the expected types is empty after
+ * removing the overlap.
*/
boolean removeOverlap() {
for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
@@ -412,38 +430,42 @@ public class Mover {
i.remove();
}
}
- return existing.isEmpty();
+ return expected.isEmpty() || existing.isEmpty();
}
}
static int run(Collection<URI> namenodes, Configuration conf)
throws IOException, InterruptedException {
- final long sleeptime = 2000*conf.getLong(
+ final long sleeptime = 2000 * conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
LOG.info("namenodes = " + namenodes);
List<NameNodeConnector> connectors = Collections.emptyList();
try {
- connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
+ connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Mover.class.getSimpleName(), MOVER_ID_PATH, conf);
- while (true) {
+ while (connectors.size() > 0) {
Collections.shuffle(connectors);
- for(NameNodeConnector nnc : connectors) {
+ Iterator<NameNodeConnector> iter = connectors.iterator();
+ while (iter.hasNext()) {
+ NameNodeConnector nnc = iter.next();
final Mover m = new Mover(nnc, conf);
final ExitStatus r = m.run();
- if (r != ExitStatus.IN_PROGRESS) {
- //must be an error statue, return.
+ if (r == ExitStatus.SUCCESS) {
+ iter.remove();
+ } else if (r != ExitStatus.IN_PROGRESS) {
+ // must be an error statue, return
return r.getExitCode();
}
}
-
Thread.sleep(sleeptime);
}
+ return ExitStatus.SUCCESS.getExitCode();
} finally {
- for(NameNodeConnector nnc : connectors) {
+ for (NameNodeConnector nnc : connectors) {
IOUtils.cleanup(LOG, nnc);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a26aa6bd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
new file mode 100644
index 0000000..d2a7fcc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.mover;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
+import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.*;
+
+/**
+ * Test the data migration tool (for Archival Storage)
+ */
+public class TestStorageMover {
+ private static final long BLOCK_SIZE = 1024;
+ private static final short REPL = 3;
+ private static final int NUM_DATANODES = 6;
+ private static final Configuration DEFAULT_CONF = new HdfsConfiguration();
+ private static final BlockStoragePolicy.Suite DEFAULT_POLICIES;
+ private static final BlockStoragePolicy HOT;
+ private static final BlockStoragePolicy WARM;
+ private static final BlockStoragePolicy COLD;
+
+ static {
+ DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(new
+ HdfsConfiguration());
+ HOT = DEFAULT_POLICIES.getPolicy("HOT");
+ WARM = DEFAULT_POLICIES.getPolicy("WARM");
+ COLD = DEFAULT_POLICIES.getPolicy("COLD");
+ Dispatcher.setBlockMoveWaitTime(10 * 1000);
+ }
+
+ /**
+ * This scheme defines files/directories and their block storage policies. It
+ * also defines snapshots.
+ */
+ static class NamespaceScheme {
+ final List<Path> files;
+ final Map<Path, List<String>> snapshotMap;
+ final Map<Path, BlockStoragePolicy> policyMap;
+
+ NamespaceScheme(List<Path> files, Map<Path,List<String>> snapshotMap,
+ Map<Path, BlockStoragePolicy> policyMap) {
+ this.files = files;
+ this.snapshotMap = snapshotMap == null ?
+ new HashMap<Path, List<String>>() : snapshotMap;
+ this.policyMap = policyMap;
+ }
+ }
+
+ /**
+ * This scheme defines DataNodes and their storage, including storage types
+ * and remaining capacities.
+ */
+ static class ClusterScheme {
+ final Configuration conf;
+ final int numDataNodes;
+ final short repl;
+ final StorageType[][] storageTypes;
+ final long[][] storageCapacities;
+
+ ClusterScheme(Configuration conf, int numDataNodes, short repl,
+ StorageType[][] types, long[][] capacities) {
+ Preconditions.checkArgument(types == null || types.length == numDataNodes);
+ Preconditions.checkArgument(capacities == null || capacities.length ==
+ numDataNodes);
+ this.conf = conf;
+ this.numDataNodes = numDataNodes;
+ this.repl = repl;
+ this.storageTypes = types;
+ this.storageCapacities = capacities;
+ }
+ }
+
+ class MigrationTest {
+ private final ClusterScheme clusterScheme;
+ private final NamespaceScheme nsScheme;
+ private final Configuration conf;
+
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem dfs;
+ private final BlockStoragePolicy.Suite policies;
+
+ MigrationTest(ClusterScheme cScheme, NamespaceScheme nsScheme) {
+ this.clusterScheme = cScheme;
+ this.nsScheme = nsScheme;
+ this.conf = clusterScheme.conf;
+ this.policies = BlockStoragePolicy.readBlockStorageSuite(conf);
+ }
+
+ /**
+ * Set up the cluster and start NameNode and DataNodes according to the
+ * corresponding scheme.
+ */
+ void setupCluster() throws Exception {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(clusterScheme
+ .numDataNodes).storageTypes(clusterScheme.storageTypes)
+ .storageCapacities(clusterScheme.storageCapacities).build();
+ cluster.waitActive();
+ dfs = cluster.getFileSystem();
+ }
+
+ void shutdownCluster() throws Exception {
+ IOUtils.cleanup(null, dfs);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Create files/directories and set their storage policies according to the
+ * corresponding scheme.
+ */
+ void prepareNamespace() throws Exception {
+ for (Path file : nsScheme.files) {
+ DFSTestUtil.createFile(dfs, file, BLOCK_SIZE * 2, clusterScheme.repl,
+ 0L);
+ }
+ for (Map.Entry<Path, List<String>> entry : nsScheme.snapshotMap.entrySet()) {
+ for (String snapshot : entry.getValue()) {
+ SnapshotTestHelper.createSnapshot(dfs, entry.getKey(), snapshot);
+ }
+ }
+ for (Map.Entry<Path, BlockStoragePolicy> entry : nsScheme.policyMap.entrySet()) {
+ dfs.setStoragePolicy(entry.getKey(), entry.getValue().getName());
+ }
+ }
+
+ /**
+ * Run the migration tool.
+ */
+ void migrate(String... args) throws Exception {
+ runMover();
+ }
+
+ /**
+ * Verify block locations after running the migration tool.
+ */
+ void verify(boolean verifyAll) throws Exception {
+ if (verifyAll) {
+ verifyNamespace();
+ } else {
+ // TODO verify according to the given path list
+
+ }
+ }
+
+ private void runMover() throws Exception {
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ int result = Mover.run(namenodes, conf);
+ Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
+ }
+
+ private void verifyNamespace() throws Exception {
+ HdfsFileStatus status = dfs.getClient().getFileInfo("/");
+ verifyRecursively(null, status);
+ }
+
+ private void verifyRecursively(final Path parent,
+ final HdfsFileStatus status) throws Exception {
+ if (status.isDir()) {
+ Path fullPath = parent == null ?
+ new Path("/") : status.getFullPath(parent);
+ DirectoryListing children = dfs.getClient().listPaths(
+ fullPath.toString(), HdfsFileStatus.EMPTY_NAME, true);
+ for (HdfsFileStatus child : children.getPartialListing()) {
+ verifyRecursively(fullPath, child);
+ }
+ } else if (!status.isSymlink()) { // is file
+ HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
+ byte policyId = fileStatus.getStoragePolicy();
+ BlockStoragePolicy policy = policies.getPolicy(policyId);
+ final List<StorageType> types = policy.chooseStorageTypes(
+ status.getReplication());
+ for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) {
+ final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
+ lb.getStorageTypes());
+ Assert.assertTrue(diff.removeOverlap());
+ }
+ }
+ }
+ }
+
+ private static StorageType[][] genStorageTypes(int numDataNodes) {
+ StorageType[][] types = new StorageType[numDataNodes][];
+ for (int i = 0; i < types.length; i++) {
+ types[i] = new StorageType[]{StorageType.DISK, StorageType.ARCHIVE};
+ }
+ return types;
+ }
+
+ private void runTest(MigrationTest test) throws Exception {
+ test.setupCluster();
+ try {
+ test.prepareNamespace();
+ test.migrate();
+ Thread.sleep(5000); // let the NN finish deletion
+ test.verify(true);
+ } finally {
+ test.shutdownCluster();
+ }
+ }
+
+ /**
+ * A normal case for Mover: move a file into archival storage
+ */
+ @Test
+ public void testMigrateFileToArchival() throws Exception {
+ final Path foo = new Path("/foo");
+ Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
+ policyMap.put(foo, COLD);
+ NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(foo), null,
+ policyMap);
+ ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
+ NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
+ MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
+ runTest(test);
+ }
+}