You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/06/03 20:52:16 UTC
hadoop git commit: HDFS-7621. Erasure Coding: update the
Balancer/Mover data migration logic. Contributed by Walter Su.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7285 5f15084bd -> 673280df2
HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic. Contributed by Walter Su.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/673280df
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/673280df
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/673280df
Branch: refs/heads/HDFS-7285
Commit: 673280df24f0228bf01777035ceeab8807da8c40
Parents: 5f15084
Author: Zhe Zhang <zh...@cloudera.com>
Authored: Wed Jun 3 11:51:58 2015 -0700
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Wed Jun 3 11:51:58 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 +
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 21 ++-
.../hadoop/hdfs/server/balancer/Dispatcher.java | 148 +++++++++++++------
.../server/blockmanagement/BlockManager.java | 26 +++-
.../apache/hadoop/hdfs/server/mover/Mover.java | 38 ++++-
.../server/protocol/BlocksWithLocations.java | 25 ++++
.../hadoop-hdfs/src/main/proto/hdfs.proto | 3 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 15 ++
.../hadoop/hdfs/protocolPB/TestPBHelper.java | 51 +++++--
.../hdfs/server/balancer/TestBalancer.java | 76 ++++++++++
.../hadoop/hdfs/server/mover/TestMover.java | 124 +++++++++++++++-
11 files changed, 452 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 278f897..511ebec 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -277,3 +277,6 @@
HDFS-8453. Erasure coding: properly handle start offset for internal blocks
in a block group. (Zhe Zhang via jing9)
+
+ HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic.
+ (Walter Su via zhz)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index b2415fa..0bfc3bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -211,6 +211,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -458,22 +459,32 @@ public class PBHelper {
}
public static BlockWithLocationsProto convert(BlockWithLocations blk) {
- return BlockWithLocationsProto.newBuilder()
- .setBlock(convert(blk.getBlock()))
+ BlockWithLocationsProto.Builder builder = BlockWithLocationsProto
+ .newBuilder().setBlock(convert(blk.getBlock()))
.addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
.addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
- .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
- .build();
+ .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()));
+ if (blk instanceof StripedBlockWithLocations) {
+ StripedBlockWithLocations sblk = (StripedBlockWithLocations) blk;
+ builder.setIndices(getByteString(sblk.getIndices()));
+ builder.setDataBlockNum(sblk.getDataBlockNum());
+ }
+ return builder.build();
}
public static BlockWithLocations convert(BlockWithLocationsProto b) {
final List<String> datanodeUuids = b.getDatanodeUuidsList();
final List<String> storageUuids = b.getStorageUuidsList();
final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
- return new BlockWithLocations(convert(b.getBlock()),
+ BlockWithLocations blk = new BlockWithLocations(convert(b.getBlock()),
datanodeUuids.toArray(new String[datanodeUuids.size()]),
storageUuids.toArray(new String[storageUuids.size()]),
convertStorageTypes(storageTypes, storageUuids.size()));
+ if (b.hasIndices()) {
+ blk = new StripedBlockWithLocations(blk, b.getIndices().toByteArray(),
+ (short) b.getDataBlockNum());
+ }
+ return blk;
}
public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 4a8f40f..930001a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.balancer;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
@@ -124,18 +127,17 @@ public class Dispatcher {
private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
/**
- * Get the block from the map;
- * if the block is not found, create a new block and put it in the map.
+ * Put block in the map if it's not found
+ * @return the block which be put in the map the first time
*/
- private DBlock get(Block b) {
- DBlock block = map.get(b);
- if (block == null) {
- block = new DBlock(b);
- map.put(b, block);
+ private DBlock putIfAbsent(Block blk, DBlock dblk) {
+ if (!map.containsKey(blk)) {
+ map.put(blk, dblk);
+ return dblk;
}
- return block;
+ return map.get(blk);
}
-
+
/** Remove all blocks except for the moved blocks. */
private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) {
for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
@@ -176,9 +178,9 @@ public class Dispatcher {
}
}
- /** This class keeps track of a scheduled block move */
+ /** This class keeps track of a scheduled reportedBlock move */
public class PendingMove {
- private DBlock block;
+ private DBlock reportedBlock;
private Source source;
private DDatanode proxySource;
private StorageGroup target;
@@ -190,7 +192,7 @@ public class Dispatcher {
@Override
public String toString() {
- final Block b = block != null ? block.getBlock() : null;
+ final Block b = reportedBlock != null ? reportedBlock.getBlock() : null;
String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ")
: " ";
return bStr + "from " + source.getDisplayName() + " to " + target
@@ -199,8 +201,8 @@ public class Dispatcher {
}
/**
- * Choose a block & a proxy source for this pendingMove whose source &
- * target have already been chosen.
+ * Choose a good block/blockGroup from source & Get reportedBlock from
+ * the block & Choose a proxy source for the reportedBlock.
*
* @return true if a block and its proxy are chosen; false otherwise
*/
@@ -224,7 +226,11 @@ public class Dispatcher {
synchronized (block) {
synchronized (movedBlocks) {
if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
- this.block = block;
+ if (block instanceof DBlockStriped) {
+ reportedBlock = ((DBlockStriped) block).getInternalBlock(source);
+ } else {
+ reportedBlock = block;
+ }
if (chooseProxySource()) {
movedBlocks.put(block);
if (LOG.isDebugEnabled()) {
@@ -251,7 +257,7 @@ public class Dispatcher {
}
// if node group is supported, first try add nodes in the same node group
if (cluster.isNodeGroupAware()) {
- for (StorageGroup loc : block.getLocations()) {
+ for (StorageGroup loc : reportedBlock.getLocations()) {
if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
&& addTo(loc)) {
return true;
@@ -259,13 +265,13 @@ public class Dispatcher {
}
}
// check if there is replica which is on the same rack with the target
- for (StorageGroup loc : block.getLocations()) {
+ for (StorageGroup loc : reportedBlock.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
return true;
}
}
// find out a non-busy replica
- for (StorageGroup loc : block.getLocations()) {
+ for (StorageGroup loc : reportedBlock.getLocations()) {
if (addTo(loc)) {
return true;
}
@@ -273,7 +279,7 @@ public class Dispatcher {
return false;
}
- /** add to a proxy source for specific block movement */
+ /** add to a proxy source for specific reportedBlock movement */
private boolean addTo(StorageGroup g) {
final DDatanode dn = g.getDDatanode();
if (dn.addPendingBlock(this)) {
@@ -288,6 +294,7 @@ public class Dispatcher {
if (LOG.isDebugEnabled()) {
LOG.debug("Start moving " + this);
}
+ assert !(reportedBlock instanceof DBlockStriped);
Socket sock = new Socket();
DataOutputStream out = null;
@@ -302,7 +309,7 @@ public class Dispatcher {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
- block.getBlock());
+ reportedBlock.getBlock());
final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
@@ -316,7 +323,7 @@ public class Dispatcher {
sendRequest(out, eb, accessToken);
receiveResponse(in);
- nnc.getBytesMoved().addAndGet(block.getNumBytes());
+ nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes());
LOG.info("Successfully moved " + this);
} catch (IOException e) {
LOG.warn("Failed to move " + this + ": " + e.getMessage());
@@ -344,14 +351,14 @@ public class Dispatcher {
}
}
- /** Send a block replace request to the output stream */
+ /** Send a reportedBlock replace request to the output stream */
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken) throws IOException {
new Sender(out).replaceBlock(eb, target.storageType, accessToken,
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
}
- /** Receive a block copy response from the input stream */
+ /** Receive a reportedBlock copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(vintPrefixed(in));
@@ -359,13 +366,13 @@ public class Dispatcher {
// read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
}
- String logInfo = "block move is failed";
+ String logInfo = "reportedBlock move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
}
/** reset the object */
private void reset() {
- block = null;
+ reportedBlock = null;
source = null;
proxySource = null;
target = null;
@@ -377,6 +384,44 @@ public class Dispatcher {
public DBlock(Block block) {
super(block);
}
+
+ public long getNumBytes(StorageGroup storage) {
+ return super.getNumBytes();
+ }
+ }
+
+ public static class DBlockStriped extends DBlock {
+
+ final byte[] indices;
+ final short dataBlockNum;
+
+ public DBlockStriped(Block block, byte[] indices, short dataBlockNum) {
+ super(block);
+ this.indices = indices;
+ this.dataBlockNum = dataBlockNum;
+ }
+
+ public DBlock getInternalBlock(StorageGroup storage) {
+ int idxInLocs = locations.indexOf(storage);
+ if (idxInLocs == -1) {
+ return null;
+ }
+ byte idxInGroup = indices[idxInLocs];
+ long blkId = getBlock().getBlockId() + idxInGroup;
+ long numBytes = getInternalBlockLength(getNumBytes(),
+ HdfsConstants.BLOCK_STRIPED_CELL_SIZE, dataBlockNum, idxInGroup);
+ Block blk = new Block(getBlock());
+ blk.setBlockId(blkId);
+ blk.setNumBytes(numBytes);
+ DBlock dblk = new DBlock(blk);
+ dblk.addLocation(storage);
+ return dblk;
+ }
+
+ @Override
+ public long getNumBytes(StorageGroup storage) {
+ return getInternalBlock(storage).getNumBytes();
+ }
}
/** The class represents a desired move. */
@@ -452,7 +497,7 @@ public class Dispatcher {
private PendingMove addPendingMove(DBlock block, final PendingMove pm) {
if (getDDatanode().addPendingBlock(pm)) {
if (pm.markMovedIfGoodBlock(block, getStorageType())) {
- incScheduledSize(pm.block.getNumBytes());
+ incScheduledSize(pm.reportedBlock.getNumBytes());
return pm;
} else {
getDDatanode().removePendingBlock(pm);
@@ -612,19 +657,34 @@ public class Dispatcher {
*/
private long getBlockList() throws IOException {
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
- final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
+ final BlocksWithLocations newBlksLocs =
+ nnc.getBlocks(getDatanodeInfo(), size);
long bytesReceived = 0;
- for (BlockWithLocations blk : newBlocks.getBlocks()) {
- bytesReceived += blk.getBlock().getNumBytes();
+ for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) {
+
+ DBlock block;
+ if (blkLocs instanceof StripedBlockWithLocations) {
+ StripedBlockWithLocations sblkLocs =
+ (StripedBlockWithLocations) blkLocs;
+ // approximate size
+ bytesReceived += sblkLocs.getBlock().getNumBytes() /
+ sblkLocs.getDataBlockNum();
+ block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(),
+ sblkLocs.getDataBlockNum());
+ } else{
+ bytesReceived += blkLocs.getBlock().getNumBytes();
+ block = new DBlock(blkLocs.getBlock());
+ }
+
synchronized (globalBlocks) {
- final DBlock block = globalBlocks.get(blk.getBlock());
+ block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block);
synchronized (block) {
block.clearLocations();
// update locations
- final String[] datanodeUuids = blk.getDatanodeUuids();
- final StorageType[] storageTypes = blk.getStorageTypes();
+ final String[] datanodeUuids = blkLocs.getDatanodeUuids();
+ final StorageType[] storageTypes = blkLocs.getStorageTypes();
for (int i = 0; i < datanodeUuids.length; i++) {
final StorageGroup g = storageGroupMap.get(
datanodeUuids[i], storageTypes[i]);
@@ -661,6 +721,8 @@ public class Dispatcher {
* target throttling has been considered. They are chosen only when they
* have the capacity to support this block move. The block should be
* dispatched immediately after this method is returned.
+ * If the block is a block group. Only the internal block on this source
+ * will be dispatched.
*
* @return a move that's good for the source to dispatch immediately.
*/
@@ -672,7 +734,7 @@ public class Dispatcher {
if (target.addPendingBlock(pendingBlock)) {
// target is not busy, so do a tentative block allocation
if (pendingBlock.chooseBlockAndProxy()) {
- long blockSize = pendingBlock.block.getNumBytes();
+ long blockSize = pendingBlock.reportedBlock.getNumBytes(this);
incScheduledSize(-blockSize);
task.size -= blockSize;
if (task.size == 0) {
@@ -744,7 +806,7 @@ public class Dispatcher {
blocksToReceive -= getBlockList();
continue;
} catch (IOException e) {
- LOG.warn("Exception while getting block list", e);
+ LOG.warn("Exception while getting reportedBlock list", e);
return;
}
} else {
@@ -883,7 +945,7 @@ public class Dispatcher {
}
public void executePendingMove(final PendingMove p) {
- // move the block
+ // move the reportedBlock
moveExecutor.execute(new Runnable() {
@Override
public void run() {
@@ -928,17 +990,17 @@ public class Dispatcher {
}
}
- // wait for all block moving to be done
+ // wait for all reportedBlock moving to be done
waitForMoveCompletion(targets);
return getBytesMoved() - bytesLastMoved;
}
- /** The sleeping period before checking if block move is completed again */
+ /** The sleeping period before checking if reportedBlock move is completed again */
static private long blockMoveWaitTime = 30000L;
/**
- * Wait for all block move confirmations.
+ * Wait for all reportedBlock move confirmations.
* @return true if there is failed move execution
*/
public static boolean waitForMoveCompletion(
@@ -965,10 +1027,10 @@ public class Dispatcher {
}
/**
- * Decide if the block is a good candidate to be moved from source to target.
- * A block is a good candidate if
+ * Decide if the block/blockGroup is a good candidate to be moved from source
+ * to target. A block is a good candidate if
* 1. the block is not in the process of being moved/has not been moved;
- * 2. the block does not have a replica on the target;
+ * 2. the block does not have a replica/internalBlock on the target;
* 3. doing the move does not reduce the number of racks that the block has
*/
private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
@@ -985,7 +1047,7 @@ public class Dispatcher {
}
final DatanodeInfo targetDatanode = target.getDatanodeInfo();
if (source.getDatanodeInfo().equals(targetDatanode)) {
- // the block is moved inside same DN
+ // the reportedBlock is moved inside same DN
return true;
}
@@ -1068,7 +1130,7 @@ public class Dispatcher {
movedBlocks.cleanup();
}
- /** set the sleeping period for block move completion check */
+ /** set the sleeping period for reportedBlock move completion check */
@VisibleForTesting
public static void setBlockMoveWaitTime(long time) {
blockMoveWaitTime = time;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 32757f9..48a1b35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@@ -3265,9 +3266,10 @@ public class BlockManager {
/**
* Get all valid locations of the block & add the block to results
- * return the length of the added block; 0 if the block is not added
+ * @return the length of the added block; 0 if the block is not added. If the
+ * added block is a block group, return its approximate internal block size
*/
- private long addBlock(Block block, List<BlockWithLocations> results) {
+ private long addBlock(BlockInfo block, List<BlockWithLocations> results) {
final List<DatanodeStorageInfo> locations = getValidLocations(block);
if(locations.size() == 0) {
return 0;
@@ -3281,9 +3283,23 @@ public class BlockManager {
storageIDs[i] = s.getStorageID();
storageTypes[i] = s.getStorageType();
}
- results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
- storageTypes));
- return block.getNumBytes();
+ BlockWithLocations blkWithLocs = new BlockWithLocations(block,
+ datanodeUuids, storageIDs, storageTypes);
+ if(block.isStriped()) {
+ BlockInfoStriped blockStriped = (BlockInfoStriped) block;
+ byte[] indices = new byte[locations.size()];
+ for (int i = 0; i < locations.size(); i++) {
+ indices[i] =
+ (byte) blockStriped.getStorageBlockIndex(locations.get(i));
+ }
+ results.add(new StripedBlockWithLocations(blkWithLocs, indices,
+ blockStriped.getDataBlockNum()));
+ // approximate size
+ return block.getNumBytes() / blockStriped.getDataBlockNum();
+ }else{
+ results.add(blkWithLocs);
+ return block.getNumBytes();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 8715ce4..ddfd1ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -46,12 +46,15 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
+
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
@@ -176,8 +179,20 @@ public class Mover {
}
}
- DBlock newDBlock(Block block, List<MLocation> locations) {
- final DBlock db = new DBlock(block);
+ DBlock newDBlock(LocatedBlock lb, List<MLocation> locations,
+ ECSchema ecSchema) {
+ Block blk = lb.getBlock().getLocalBlock();
+ DBlock db;
+ if (lb.isStriped()) {
+ LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+ byte[] indices = new byte[lsb.getBlockIndices().length];
+ for (int i = 0; i < indices.length; i++) {
+ indices[i] = (byte) lsb.getBlockIndices()[i];
+ }
+ db = new DBlockStriped(blk, indices, (short) ecSchema.getNumDataUnits());
+ } else {
+ db = new DBlock(blk);
+ }
for(MLocation ml : locations) {
StorageGroup source = storages.getSource(ml);
if (source != null) {
@@ -358,9 +373,10 @@ public class Mover {
LOG.warn("Failed to get the storage policy of file " + fullPath);
return false;
}
- final List<StorageType> types = policy.chooseStorageTypes(
+ List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
+ final ECSchema ecSchema = status.getECSchema();
final LocatedBlocks locatedBlocks = status.getBlockLocations();
boolean hasRemaining = false;
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
@@ -371,10 +387,13 @@ public class Mover {
continue;
}
LocatedBlock lb = lbs.get(i);
+ if (lb.isStriped()) {
+ types = policy.chooseStorageTypes((short) lb.getLocations().length);
+ }
final StorageTypeDiff diff = new StorageTypeDiff(types,
lb.getStorageTypes());
if (!diff.removeOverlap(true)) {
- if (scheduleMoves4Block(diff, lb)) {
+ if (scheduleMoves4Block(diff, lb, ecSchema)) {
hasRemaining |= (diff.existing.size() > 1 &&
diff.expected.size() > 1);
}
@@ -383,10 +402,13 @@ public class Mover {
return hasRemaining;
}
- boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
+ boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb,
+ ECSchema ecSchema) {
final List<MLocation> locations = MLocation.toLocations(lb);
- Collections.shuffle(locations);
- final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
+ if (!(lb instanceof LocatedStripedBlock)) {
+ Collections.shuffle(locations);
+ }
+ final DBlock db = newDBlock(lb, locations, ecSchema);
for (final StorageType t : diff.existing) {
for (final MLocation ml : locations) {
@@ -729,4 +751,4 @@ public class Mover {
System.exit(-1);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
index a985dbd..0507faf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.protocol;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
@@ -91,6 +92,30 @@ public class BlocksWithLocations {
}
}
+ public static class StripedBlockWithLocations extends BlockWithLocations {
+ final byte[] indices;
+ final short dataBlockNum;
+
+ public StripedBlockWithLocations(BlockWithLocations blk, byte[] indices,
+ short dataBlockNum) {
+ super(blk.getBlock(), blk.getDatanodeUuids(), blk.getStorageIDs(),
+ blk.getStorageTypes());
+ Preconditions.checkArgument(
+ blk.getDatanodeUuids().length == indices.length);
+ this.indices = indices;
+ this.dataBlockNum = dataBlockNum;
+
+ }
+
+ public byte[] getIndices() {
+ return indices;
+ }
+
+ public short getDataBlockNum() {
+ return dataBlockNum;
+ }
+ }
+
private final BlockWithLocations[] blocks;
/** Constructor with one parameter */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index f64cf8f..e6db596 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -532,6 +532,9 @@ message BlockWithLocationsProto {
repeated string datanodeUuids = 2; // Datanodes with replicas of the block
repeated string storageUuids = 3; // Storages with replicas of the block
repeated StorageTypeProto storageTypes = 4;
+
+ optional bytes indices = 5;
+ optional uint32 dataBlockNum = 6;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 6cd7003..db230e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1964,4 +1964,19 @@ public class DFSTestUtil {
out.flushInternal();
return out.getBlock();
}
+
+ /**
+ * Verify that blocks in striped block group are on different nodes.
+ */
+ public static void verifyLocatedStripedBlocks(LocatedBlocks lbs,
+ int groupSize) {
+ for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+ HashSet<DatanodeInfo> locs = new HashSet<>();
+ for (DatanodeInfo datanodeInfo : lb.getLocations()) {
+ locs.add(datanodeInfo);
+ }
+ assertEquals(groupSize, lb.getLocations().length);
+ assertEquals(groupSize, locs.size());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index a0b2038..3675e63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -189,40 +190,58 @@ public class TestPBHelper {
assertEquals(b, b2);
}
- private static BlockWithLocations getBlockWithLocations(int bid) {
+ private static BlockWithLocations getBlockWithLocations(
+ int bid, boolean isStriped) {
final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
final String[] storageIDs = {"s1", "s2", "s3"};
final StorageType[] storageTypes = {
StorageType.DISK, StorageType.DISK, StorageType.DISK};
- return new BlockWithLocations(new Block(bid, 0, 1),
+ final byte[] indices = {0, 1, 2};
+ final short dataBlkNum = 6;
+ BlockWithLocations blkLocs = new BlockWithLocations(new Block(bid, 0, 1),
datanodeUuids, storageIDs, storageTypes);
+ if (isStriped) {
+ blkLocs = new StripedBlockWithLocations(blkLocs, indices, dataBlkNum);
+ }
+ return blkLocs;
}
private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
assertEquals(locs1.getBlock(), locs2.getBlock());
assertTrue(Arrays.equals(locs1.getStorageIDs(), locs2.getStorageIDs()));
+ if (locs1 instanceof StripedBlockWithLocations) {
+ assertTrue(Arrays.equals(((StripedBlockWithLocations) locs1).getIndices(),
+ ((StripedBlockWithLocations) locs2).getIndices()));
+ }
}
@Test
public void testConvertBlockWithLocations() {
- BlockWithLocations locs = getBlockWithLocations(1);
- BlockWithLocationsProto locsProto = PBHelper.convert(locs);
- BlockWithLocations locs2 = PBHelper.convert(locsProto);
- compare(locs, locs2);
+ boolean[] testSuite = new boolean[]{false, true};
+ for (int i = 0; i < testSuite.length; i++) {
+ BlockWithLocations locs = getBlockWithLocations(1, testSuite[i]);
+ BlockWithLocationsProto locsProto = PBHelper.convert(locs);
+ BlockWithLocations locs2 = PBHelper.convert(locsProto);
+ compare(locs, locs2);
+ }
}
@Test
public void testConvertBlocksWithLocations() {
- BlockWithLocations[] list = new BlockWithLocations[] {
- getBlockWithLocations(1), getBlockWithLocations(2) };
- BlocksWithLocations locs = new BlocksWithLocations(list);
- BlocksWithLocationsProto locsProto = PBHelper.convert(locs);
- BlocksWithLocations locs2 = PBHelper.convert(locsProto);
- BlockWithLocations[] blocks = locs.getBlocks();
- BlockWithLocations[] blocks2 = locs2.getBlocks();
- assertEquals(blocks.length, blocks2.length);
- for (int i = 0; i < blocks.length; i++) {
- compare(blocks[i], blocks2[i]);
+ boolean[] testSuite = new boolean[]{false, true};
+ for (int i = 0; i < testSuite.length; i++) {
+ BlockWithLocations[] list = new BlockWithLocations[]{
+ getBlockWithLocations(1, testSuite[i]),
+ getBlockWithLocations(2, testSuite[i])};
+ BlocksWithLocations locs = new BlocksWithLocations(list);
+ BlocksWithLocationsProto locsProto = PBHelper.convert(locs);
+ BlocksWithLocations locs2 = PBHelper.convert(locsProto);
+ BlockWithLocations[] blocks = locs.getBlocks();
+ BlockWithLocations[] blocks2 = locs2.getBlocks();
+ assertEquals(blocks.length, blocks2.length);
+ for (int j = 0; j < blocks.length; j++) {
+ compare(blocks[j], blocks2[j]);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 92d31d0..f6475cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@@ -133,6 +134,21 @@ public class TestBalancer {
LazyPersistTestCase.initCacheManipulator();
}
+ int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+ int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+ int groupSize = dataBlocks + parityBlocks;
+ private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final static int stripesPerBlock = 4;
+ static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock;
+
+ static void initConfWithStripe(Configuration conf) {
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+ SimulatedFSDataset.setFactory(conf);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
+ }
+
/* create a file with a length of <code>fileLen</code> */
static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
short replicationFactor, int nnIndex)
@@ -1452,6 +1468,66 @@ public class TestBalancer {
}
}
+ @Test(timeout = 100000)
+ public void testBalancerWithStripedFile() throws Exception {
+ Configuration conf = new Configuration();
+ initConfWithStripe(conf);
+ int numOfDatanodes = dataBlocks + parityBlocks + 2;
+ int numOfRacks = dataBlocks;
+ long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE;
+ long[] capacities = new long[numOfDatanodes];
+ for (int i = 0; i < capacities.length; i++) {
+ capacities[i] = capacity;
+ }
+ String[] racks = new String[numOfDatanodes];
+ for (int i = 0; i < numOfDatanodes; i++) {
+ racks[i] = "/rack" + (i % numOfRacks);
+ }
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numOfDatanodes)
+ .racks(racks)
+ .simulatedCapacities(capacities)
+ .build();
+
+ try {
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
+ client.createErasureCodingZone("/", null, 0);
+
+ long totalCapacity = sum(capacities);
+
+ // fill up the cluster with 30% data. It'll be 45% full plus parity.
+ long fileLen = totalCapacity * 3 / 10;
+ long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
+ FileSystem fs = cluster.getFileSystem(0);
+ DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
+
+ // verify locations of striped blocks
+ LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+ DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+ // add one datanode
+ String newRack = "/rack" + (++numOfRacks);
+ cluster.startDataNodes(conf, 1, true, null,
+ new String[]{newRack}, null, new long[]{capacity});
+ totalCapacity += capacity;
+ cluster.triggerHeartbeats();
+
+ // run balancer and validate results
+ Balancer.Parameters p = Balancer.Parameters.DEFAULT;
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ runBalancer(conf, totalUsedSpace, totalCapacity, p, 0);
+
+ // verify locations of striped blocks
+ locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+ DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
/**
* @param args
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673280df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index f4bedab..74f09fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -34,10 +35,16 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
@@ -83,7 +90,7 @@ public class TestMover {
final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
final List<MLocation> locations = MLocation.toLocations(lb);
final MLocation ml = locations.get(0);
- final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), locations);
+ final DBlock db = mover.newDBlock(lb, locations, null);
final List<StorageType> storageTypes = new ArrayList<StorageType>(
Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT));
@@ -361,4 +368,119 @@ public class TestMover {
cluster.shutdown();
}
}
+
+ int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+ int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+ private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+ private final static int stripesPerBlock = 4;
+ static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock;
+
+ static void initConfWithStripe(Configuration conf) {
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+ Dispatcher.setBlockMoveWaitTime(3000L);
+ }
+
+ @Test(timeout = 300000)
+ public void testMoverWithStripedFile() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConfWithStripe(conf);
+
+ // start 10 datanodes
+ int numOfDatanodes =10;
+ int storagesPerDatanode=2;
+ long capacity = 10 * DEFAULT_STRIPE_BLOCK_SIZE;
+ long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
+ for (int i = 0; i < numOfDatanodes; i++) {
+ for(int j=0;j<storagesPerDatanode;j++){
+ capacities[i][j]=capacity;
+ }
+ }
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numOfDatanodes)
+ .storagesPerDatanode(storagesPerDatanode)
+ .storageTypes(new StorageType[][]{
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE}})
+ .storageCapacities(capacities)
+ .build();
+
+ try {
+ cluster.waitActive();
+
+ // set "/bar" directory with HOT storage policy.
+ ClientProtocol client = NameNodeProxies.createProxy(conf,
+ cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+ String barDir = "/bar";
+ client.mkdirs(barDir, new FsPermission((short) 777), true);
+ client.setStoragePolicy(barDir,
+ HdfsServerConstants.HOT_STORAGE_POLICY_NAME);
+ // set "/bar" directory with EC zone.
+ client.createErasureCodingZone(barDir, null, 0);
+
+ // write file to barDir
+ final String fooFile = "/bar/foo";
+ long fileLen = 20 * DEFAULT_STRIPE_BLOCK_SIZE ;
+ DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
+ fileLen,(short) 3, 0);
+
+ // verify storage types and locations
+ LocatedBlocks locatedBlocks =
+ client.getBlockLocations(fooFile, 0, fileLen);
+ for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
+ for( StorageType type : lb.getStorageTypes()){
+ Assert.assertEquals(StorageType.DISK, type);
+ }
+ }
+ DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+ dataBlocks + parityBlocks);
+
+ // start 5 more datanodes
+ numOfDatanodes +=5;
+ capacities = new long[5][storagesPerDatanode];
+ for (int i = 0; i < 5; i++) {
+ for(int j=0;j<storagesPerDatanode;j++){
+ capacities[i][j]=capacity;
+ }
+ }
+ cluster.startDataNodes(conf, 5,
+ new StorageType[][]{
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+ true, null, null, null,capacities, null, false, false, false, null);
+ cluster.triggerHeartbeats();
+
+ // move file to ARCHIVE
+ client.setStoragePolicy(barDir, "COLD");
+ // run Mover
+ int rc = ToolRunner.run(conf, new Mover.Cli(),
+ new String[] { "-p", barDir });
+ Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
+
+ // verify storage types and locations
+ locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
+ for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
+ for( StorageType type : lb.getStorageTypes()){
+ Assert.assertEquals(StorageType.ARCHIVE, type);
+ }
+ }
+ DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+ dataBlocks + parityBlocks);
+
+ }finally{
+ cluster.shutdown();
+ }
+ }
}