You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/09/18 17:12:53 UTC
svn commit: r1524444 [1/3] - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/or...
Author: szetszwo
Date: Wed Sep 18 15:12:52 2013
New Revision: 1524444
URL: http://svn.apache.org/r1524444
Log:
HDFS-4990. Change BlockPlacementPolicy to choose storages instead of datanodes.
Modified:
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt Wed Sep 18 15:12:52 2013
@@ -23,3 +23,6 @@ IMPROVEMENTS:
fix a synchronization problem in DatanodeStorageInfo. (szetszwo)
HDFS-5157. Add StorageType to FsVolume. (Junping Du via szetszwo)
+
+ HDFS-4990. Change BlockPlacementPolicy to choose storages instead of
+ datanodes. (szetszwo)
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Sep 18 15:12:52 2013
@@ -913,7 +913,8 @@ public class DFSOutputStream extends FSO
//get a new datanode
final DatanodeInfo[] original = nodes;
final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
- src, block, nodes, failed.toArray(new DatanodeInfo[failed.size()]),
+ src, block, nodes, storageIDs,
+ failed.toArray(new DatanodeInfo[failed.size()]),
1, dfsClient.clientName);
nodes = lb.getLocations();
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Sep 18 15:12:52 2013
@@ -131,6 +131,23 @@ public class DFSUtil {
return SECURE_RANDOM.get();
}
+ /** Shuffle the elements in the given array. */
+ public static <T> T[] shuffle(final T[] array) {
+ if (array != null && array.length > 0) {
+ final Random random = getRandom();
+ for (int n = array.length; n > 1; ) {
+ final int randomIndex = random.nextInt(n);
+ n--;
+ if (n != randomIndex) {
+ final T tmp = array[randomIndex];
+ array[randomIndex] = array[n];
+ array[n] = tmp;
+ }
+ }
+ }
+ return array;
+ }
+
/**
* Compartor for sorting DataNodeInfo[] based on decommissioned states.
* Decommissioned nodes are moved to the end of the array on sorting with
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Sep 18 15:12:52 2013
@@ -353,7 +353,8 @@ public interface ClientProtocol {
*/
@Idempotent
public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
- final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+ final DatanodeInfo[] existings, final String[] existingStorageIDs,
+ final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws AccessControlException, FileNotFoundException,
SafeModeException, UnresolvedLinkException, IOException;
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed Sep 18 15:12:52 2013
@@ -59,18 +59,18 @@ public class LocatedBlock {
this(b, locs, null, null, startOffset, corrupt);
}
- public static LocatedBlock createLocatedBlock(ExtendedBlock b,
- DatanodeStorageInfo[] storages, long startOffset, boolean corrupt) {
- final DatanodeInfo[] locs = new DatanodeInfo[storages.length];
- final String[] storageIDs = new String[storages.length];
- final StorageType[] storageType = new StorageType[storages.length];
- for(int i = 0; i < storages.length; i++) {
- locs[i] = storages[i].getDatanodeDescriptor();
- storageIDs[i] = storages[i].getStorageID();
- storageType[i] = storages[i].getStorageType();
- }
- return new LocatedBlock(b, locs, storageIDs, storageType, startOffset, corrupt);
+ public LocatedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages) {
+ this(b, storages, -1, false); // startOffset is unknown
+ }
+
+ public LocatedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages,
+ long startOffset, boolean corrupt) {
+ this(b, DatanodeStorageInfo.toDatanodeInfos(storages),
+ DatanodeStorageInfo.toStorageIDs(storages),
+ DatanodeStorageInfo.toStorageTypes(storages),
+ startOffset, corrupt); // startOffset is unknown
}
+
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs,
StorageType[] storageTypes, long startOffset,
boolean corrupt) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Wed Sep 18 15:12:52 2013
@@ -405,14 +405,17 @@ public class ClientNamenodeProtocolServe
throws ServiceException {
try {
List<DatanodeInfoProto> existingList = req.getExistingsList();
+ List<String> existingStorageIDsList = req.getExistingStorageIDsList();
List<DatanodeInfoProto> excludesList = req.getExcludesList();
- LocatedBlock result = server.getAdditionalDatanode(
- req.getSrc(), PBHelper.convert(req.getBlk()),
+ LocatedBlock result = server.getAdditionalDatanode(req.getSrc(),
+ PBHelper.convert(req.getBlk()),
PBHelper.convert(existingList.toArray(
new DatanodeInfoProto[existingList.size()])),
+ existingStorageIDsList.toArray(
+ new String[existingStorageIDsList.size()]),
PBHelper.convert(excludesList.toArray(
new DatanodeInfoProto[excludesList.size()])),
- req.getNumAdditionalNodes(), req.getClientName());
+ req.getNumAdditionalNodes(), req.getClientName());
return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
PBHelper.convert(result))
.build();
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Wed Sep 18 15:12:52 2013
@@ -335,7 +335,8 @@ public class ClientNamenodeProtocolTrans
@Override
public LocatedBlock getAdditionalDatanode(String src, ExtendedBlock blk,
- DatanodeInfo[] existings, DatanodeInfo[] excludes,
+ DatanodeInfo[] existings, String[] existingStorageIDs,
+ DatanodeInfo[] excludes,
int numAdditionalNodes, String clientName) throws AccessControlException,
FileNotFoundException, SafeModeException, UnresolvedLinkException,
IOException {
@@ -344,6 +345,7 @@ public class ClientNamenodeProtocolTrans
.setSrc(src)
.setBlk(PBHelper.convert(blk))
.addAllExistings(PBHelper.convert(existings))
+ .addAllExistingStorageIDs(Arrays.asList(existingStorageIDs))
.addAllExcludes(PBHelper.convert(excludes))
.setNumAdditionalNodes(numAdditionalNodes)
.setClientName(clientName)
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Sep 18 15:12:52 2013
@@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlocksProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageIDsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
@@ -744,7 +745,8 @@ public class PBHelper {
for (int i = 0; i < blocks.length; i++) {
builder.addBlocks(PBHelper.convert(blocks[i]));
}
- builder.addAllTargets(PBHelper.convert(cmd.getTargets()));
+ builder.addAllTargets(convert(cmd.getTargets()))
+ .addAllTargetStorageIDs(convert(cmd.getTargetStorageIDs()));
return builder.build();
}
@@ -757,6 +759,15 @@ public class PBHelper {
return Arrays.asList(ret);
}
+ private static List<StorageIDsProto> convert(String[][] targetStorageIDs) {
+ StorageIDsProto[] ret = new StorageIDsProto[targetStorageIDs.length];
+ for (int i = 0; i < targetStorageIDs.length; i++) {
+ ret[i] = StorageIDsProto.newBuilder()
+ .addAllStorageIDs(Arrays.asList(targetStorageIDs[i])).build();
+ }
+ return Arrays.asList(ret);
+ }
+
public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
if (datanodeCommand == null) {
@@ -831,6 +842,14 @@ public class PBHelper {
for (int i = 0; i < targetList.size(); i++) {
targets[i] = PBHelper.convert(targetList.get(i));
}
+
+ List<StorageIDsProto> targetStorageIDsList = blkCmd.getTargetStorageIDsList();
+ String[][] targetStorageIDs = new String[targetStorageIDsList.size()][];
+ for(int i = 0; i < targetStorageIDs.length; i++) {
+ List<String> storageIDs = targetStorageIDsList.get(i).getStorageIDsList();
+ targetStorageIDs[i] = storageIDs.toArray(new String[storageIDs.size()]);
+ }
+
int action = DatanodeProtocol.DNA_UNKNOWN;
switch (blkCmd.getAction()) {
case TRANSFER:
@@ -843,7 +862,8 @@ public class PBHelper {
action = DatanodeProtocol.DNA_SHUTDOWN;
break;
}
- return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
+ return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
+ targetStorageIDs);
}
public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Sep 18 15:12:52 2013
@@ -835,16 +835,6 @@ public class Balancer {
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
}
- /* Shuffle datanode array */
- static private void shuffleArray(DatanodeInfo[] datanodes) {
- for (int i=datanodes.length; i>1; i--) {
- int randomIndex = DFSUtil.getRandom().nextInt(i);
- DatanodeInfo tmp = datanodes[randomIndex];
- datanodes[randomIndex] = datanodes[i-1];
- datanodes[i-1] = tmp;
- }
- }
-
/* Given a data node set, build a network topology and decide
* over-utilized datanodes, above average utilized datanodes,
* below average utilized datanodes, and underutilized datanodes.
@@ -874,8 +864,7 @@ public class Balancer {
* an increasing order or a decreasing order.
*/
long overLoadedBytes = 0L, underLoadedBytes = 0L;
- shuffleArray(datanodes);
- for (DatanodeInfo datanode : datanodes) {
+ for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
continue; // ignore decommissioning or decommissioned nodes
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Sep 18 15:12:52 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
@@ -473,8 +474,8 @@ public class BlockManager {
private void dumpBlockMeta(Block block, PrintWriter out) {
List<DatanodeDescriptor> containingNodes =
new ArrayList<DatanodeDescriptor>();
- List<DatanodeDescriptor> containingLiveReplicasNodes =
- new ArrayList<DatanodeDescriptor>();
+ List<DatanodeStorageInfo> containingLiveReplicasNodes =
+ new ArrayList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
@@ -774,7 +775,7 @@ public class BlockManager {
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
- return LocatedBlock.createLocatedBlock(eb, storages, pos, false);
+ return new LocatedBlock(eb, storages, pos, false);
}
// get block locations
@@ -789,14 +790,14 @@ public class BlockManager {
final int numNodes = blocksMap.numNodes(blk);
final boolean isCorrupt = numCorruptNodes == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
- final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+ final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
int j = 0;
if (numMachines > 0) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
if (isCorrupt || (!isCorrupt && !replicaCorrupt))
- machines[j++] = d;
+ machines[j++] = storage;
}
}
assert j == machines.length :
@@ -1195,7 +1196,7 @@ public class BlockManager {
@VisibleForTesting
int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
int requiredReplication, numEffectiveReplicas;
- List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
+ List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
BlockCollection bc = null;
int additionalReplRequired;
@@ -1220,7 +1221,7 @@ public class BlockManager {
// get a source data-node
containingNodes = new ArrayList<DatanodeDescriptor>();
- liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
+ List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas,
@@ -1279,7 +1280,7 @@ public class BlockManager {
namesystem.writeLock();
try {
for(ReplicationWork rw : work){
- DatanodeDescriptor[] targets = rw.targets;
+ final DatanodeStorageInfo[] targets = rw.targets;
if(targets == null || targets.length == 0){
rw.targets = null;
continue;
@@ -1317,7 +1318,8 @@ public class BlockManager {
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
- if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
+ if (rw.srcNode.getNetworkLocation().equals(
+ targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
continue;
}
@@ -1327,8 +1329,8 @@ public class BlockManager {
rw.srcNode.addBlockToBeReplicated(block, targets);
scheduledWork++;
- for (DatanodeDescriptor dn : targets) {
- dn.incBlocksScheduled();
+ for (DatanodeStorageInfo storage : targets) {
+ storage.getDatanodeDescriptor().incBlocksScheduled();
}
// Move the block-replication into a "pending" state.
@@ -1354,7 +1356,7 @@ public class BlockManager {
if (blockLog.isInfoEnabled()) {
// log which blocks have been scheduled for replication
for(ReplicationWork rw : work){
- DatanodeDescriptor[] targets = rw.targets;
+ DatanodeStorageInfo[] targets = rw.targets;
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
for (int k = 0; k < targets.length; k++) {
@@ -1383,15 +1385,16 @@ public class BlockManager {
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
* List, boolean, Set, long)
*/
- public DatanodeDescriptor[] chooseTarget(final String src,
+ public DatanodeStorageInfo[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
final Set<Node> excludedNodes,
final long blocksize, List<String> favoredNodes) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
- final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
+ final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes, blocksize,
- favoredDatanodeDescriptors);
+ // TODO: get storage type from file
+ favoredDatanodeDescriptors, StorageType.DEFAULT);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to "
+ targets.length + " nodes instead of minReplication (="
@@ -1452,12 +1455,11 @@ public class BlockManager {
* the given block
*/
@VisibleForTesting
- DatanodeDescriptor chooseSourceDatanode(
- Block block,
- List<DatanodeDescriptor> containingNodes,
- List<DatanodeDescriptor> nodesContainingLiveReplicas,
- NumberReplicas numReplicas,
- int priority) {
+ DatanodeDescriptor chooseSourceDatanode(Block block,
+ List<DatanodeDescriptor> containingNodes,
+ List<DatanodeStorageInfo> nodesContainingLiveReplicas,
+ NumberReplicas numReplicas,
+ int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
DatanodeDescriptor srcNode = null;
@@ -1478,7 +1480,7 @@ public class BlockManager {
else if (excessBlocks != null && excessBlocks.contains(block)) {
excess++;
} else {
- nodesContainingLiveReplicas.add(node);
+ nodesContainingLiveReplicas.add(storage);
live++;
}
containingNodes.add(node);
@@ -1621,7 +1623,8 @@ public class BlockManager {
// To minimize startup time, we discard any second (or later) block reports
// that we receive while still in startup phase.
- final DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
+ final DatanodeStorageInfo storageInfo = node.updateStorage(storage);
+ LOG.info("XXX storageInfo=" + storageInfo + ", storage=" + storage);
if (namesystem.isInStartupSafeMode()
&& storageInfo.getBlockReportCount() > 0) {
blockLog.info("BLOCK* processReport: "
@@ -2636,7 +2639,7 @@ assert storedBlock.findDatanode(dn) < 0
//
// Modify the blocks->datanode map and node's map.
//
- pendingReplications.decrement(block, node);
+ pendingReplications.decrement(block, node, storageID);
processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
delHintNode);
}
@@ -3225,24 +3228,24 @@ assert storedBlock.findDatanode(dn) < 0
private DatanodeDescriptor srcNode;
private List<DatanodeDescriptor> containingNodes;
- private List<DatanodeDescriptor> liveReplicaNodes;
+ private List<DatanodeStorageInfo> liveReplicaStorages;
private int additionalReplRequired;
- private DatanodeDescriptor targets[];
+ private DatanodeStorageInfo targets[];
private int priority;
public ReplicationWork(Block block,
BlockCollection bc,
DatanodeDescriptor srcNode,
List<DatanodeDescriptor> containingNodes,
- List<DatanodeDescriptor> liveReplicaNodes,
+ List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired,
int priority) {
this.block = block;
this.bc = bc;
this.srcNode = srcNode;
this.containingNodes = containingNodes;
- this.liveReplicaNodes = liveReplicaNodes;
+ this.liveReplicaStorages = liveReplicaStorages;
this.additionalReplRequired = additionalReplRequired;
this.priority = priority;
this.targets = null;
@@ -3251,8 +3254,8 @@ assert storedBlock.findDatanode(dn) < 0
private void chooseTargets(BlockPlacementPolicy blockplacement,
Set<Node> excludedNodes) {
targets = blockplacement.chooseTarget(bc.getName(),
- additionalReplRequired, srcNode, liveReplicaNodes, false,
- excludedNodes, block.getNumBytes());
+ additionalReplRequired, srcNode, liveReplicaStorages, false,
+ excludedNodes, block.getNumBytes(), StorageType.DEFAULT);
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Wed Sep 18 15:12:52 2013
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -67,13 +68,14 @@ public abstract class BlockPlacementPoli
* @return array of DatanodeDescriptor instances chosen as target
* and sorted as a pipeline.
*/
- public abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+ public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
- List<DatanodeDescriptor> chosenNodes,
+ List<DatanodeStorageInfo> chosen,
boolean returnChosenNodes,
Set<Node> excludedNodes,
- long blocksize);
+ long blocksize,
+ StorageType storageType);
/**
* Same as {@link #chooseTarget(String, int, Node, List, boolean,
@@ -82,16 +84,19 @@ public abstract class BlockPlacementPoli
* is only a hint and due to cluster state, namenode may not be
* able to place the blocks on these datanodes.
*/
- DatanodeDescriptor[] chooseTarget(String src,
+ DatanodeStorageInfo[] chooseTarget(String src,
int numOfReplicas, Node writer,
Set<Node> excludedNodes,
- long blocksize, List<DatanodeDescriptor> favoredNodes) {
+ long blocksize,
+ List<DatanodeDescriptor> favoredNodes,
+ StorageType storageType) {
// This class does not provide the functionality of placing
// a block in favored datanodes. The implementations of this class
// are expected to provide this functionality
+
return chooseTarget(src, numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>(numOfReplicas), false, excludedNodes,
- blocksize);
+ new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
+ excludedNodes, blocksize, storageType);
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Sep 18 15:12:52 2013
@@ -29,11 +29,14 @@ import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
@@ -103,99 +106,101 @@ public class BlockPlacementPolicyDefault
}
@Override
- public DatanodeDescriptor[] chooseTarget(String srcPath,
+ public DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
- List<DatanodeDescriptor> chosenNodes,
+ List<DatanodeStorageInfo> chosenNodes,
boolean returnChosenNodes,
Set<Node> excludedNodes,
- long blocksize) {
+ long blocksize,
+ StorageType storageType) {
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
- excludedNodes, blocksize);
+ excludedNodes, blocksize, storageType);
}
@Override
- DatanodeDescriptor[] chooseTarget(String src,
+ DatanodeStorageInfo[] chooseTarget(String src,
int numOfReplicas,
Node writer,
Set<Node> excludedNodes,
long blocksize,
- List<DatanodeDescriptor> favoredNodes) {
+ List<DatanodeDescriptor> favoredNodes,
+ StorageType storageType) {
try {
if (favoredNodes == null || favoredNodes.size() == 0) {
// Favored nodes not specified, fall back to regular block placement.
return chooseTarget(src, numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>(numOfReplicas), false,
- excludedNodes, blocksize);
+ new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
+ excludedNodes, blocksize, storageType);
}
Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
new HashSet<Node>() : new HashSet<Node>(excludedNodes);
// Choose favored nodes
- List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
+ List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
boolean avoidStaleNodes = stats != null
&& stats.isAvoidingStaleDataNodesForWrite();
for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
DatanodeDescriptor favoredNode = favoredNodes.get(i);
// Choose a single node which is local to favoredNode.
// 'results' is updated within chooseLocalNode
- DatanodeDescriptor target = chooseLocalNode(favoredNode,
+ final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
favoriteAndExcludedNodes, blocksize,
- getMaxNodesPerRack(results,
- numOfReplicas)[1], results, avoidStaleNodes);
+ getMaxNodesPerRack(results.size(), numOfReplicas)[1],
+ results, avoidStaleNodes, storageType);
if (target == null) {
LOG.warn("Could not find a target for file " + src
+ " with favored node " + favoredNode);
continue;
}
- favoriteAndExcludedNodes.add(target);
+ favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
}
if (results.size() < numOfReplicas) {
// Not enough favored nodes, choose other nodes.
numOfReplicas -= results.size();
- DatanodeDescriptor[] remainingTargets =
+ DatanodeStorageInfo[] remainingTargets =
chooseTarget(src, numOfReplicas, writer, results,
- false, favoriteAndExcludedNodes, blocksize);
+ false, favoriteAndExcludedNodes, blocksize, storageType);
for (int i = 0; i < remainingTargets.length; i++) {
results.add(remainingTargets[i]);
}
}
return getPipeline(writer,
- results.toArray(new DatanodeDescriptor[results.size()]));
+ results.toArray(new DatanodeStorageInfo[results.size()]));
} catch (NotEnoughReplicasException nr) {
// Fall back to regular block placement disregarding favored nodes hint
return chooseTarget(src, numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>(numOfReplicas), false,
- excludedNodes, blocksize);
+ new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
+ excludedNodes, blocksize, storageType);
}
}
/** This is the implementation. */
- private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
Node writer,
- List<DatanodeDescriptor> chosenNodes,
+ List<DatanodeStorageInfo> chosenStorage,
boolean returnChosenNodes,
Set<Node> excludedNodes,
- long blocksize) {
+ long blocksize,
+ StorageType storageType) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
- return DatanodeDescriptor.EMPTY_ARRAY;
+ return DatanodeStorageInfo.EMPTY_ARRAY;
}
if (excludedNodes == null) {
excludedNodes = new HashSet<Node>();
}
- int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
+ int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
- List<DatanodeDescriptor> results =
- new ArrayList<DatanodeDescriptor>(chosenNodes);
- for (DatanodeDescriptor node:chosenNodes) {
+ final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
+ for (DatanodeStorageInfo storage : chosenStorage) {
// add localMachine and related nodes to excludedNodes
- addToExcludedNodes(node, excludedNodes);
+ addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
}
if (!clusterMap.contains(writer)) {
@@ -205,20 +210,19 @@ public class BlockPlacementPolicyDefault
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
Node localNode = chooseTarget(numOfReplicas, writer,
- excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
if (!returnChosenNodes) {
- results.removeAll(chosenNodes);
+ results.removeAll(chosenStorage);
}
// sorting nodes to form a pipeline
return getPipeline((writer==null)?localNode:writer,
- results.toArray(new DatanodeDescriptor[results.size()]));
+ results.toArray(new DatanodeStorageInfo[results.size()]));
}
- private int[] getMaxNodesPerRack(List<DatanodeDescriptor> chosenNodes,
- int numOfReplicas) {
+ private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
int clusterSize = clusterMap.getNumOfLeaves();
- int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+ int totalNumOfReplicas = numOfChosen + numOfReplicas;
if (totalNumOfReplicas > clusterSize) {
numOfReplicas -= (totalNumOfReplicas-clusterSize);
totalNumOfReplicas = clusterSize;
@@ -243,8 +247,9 @@ public class BlockPlacementPolicyDefault
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results,
- final boolean avoidStaleNodes) {
+ List<DatanodeStorageInfo> results,
+ final boolean avoidStaleNodes,
+ StorageType storageType) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
@@ -253,7 +258,7 @@ public class BlockPlacementPolicyDefault
int numOfResults = results.size();
boolean newBlock = (numOfResults==0);
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
- writer = results.get(0);
+ writer = results.get(0).getDatanodeDescriptor();
}
// Keep a copy of original excludedNodes
@@ -261,42 +266,49 @@ public class BlockPlacementPolicyDefault
new HashSet<Node>(excludedNodes) : null;
try {
if (numOfResults == 0) {
- writer = chooseLocalNode(writer, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ writer = chooseLocalStorage(writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageType)
+ .getDatanodeDescriptor();
if (--numOfReplicas == 0) {
return writer;
}
}
+ final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
- chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
+ results, avoidStaleNodes, storageType);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
- if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
- chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack,
- results, avoidStaleNodes);
+ final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
+ if (clusterMap.isOnSameRack(dn0, dn1)) {
+ chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
+ results, avoidStaleNodes, storageType);
} else if (newBlock){
- chooseLocalRack(results.get(1), excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
+ results, avoidStaleNodes, storageType);
} else {
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
- results, avoidStaleNodes);
+ results, avoidStaleNodes, storageType);
}
if (--numOfReplicas == 0) {
return writer;
}
}
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e) {
- LOG.warn("Not able to place enough replicas, still in need of "
- + (totalReplicasExpected - results.size()) + " to reach "
- + totalReplicasExpected + "\n"
- + e.getMessage());
+ final String message = "Failed to place enough replicas, still in need of "
+ + (totalReplicasExpected - results.size()) + " to reach "
+ + totalReplicasExpected + ".";
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(message, e);
+ } else {
+ LOG.warn(message + " " + e.getMessage());
+ }
+
if (avoidStaleNodes) {
// Retry chooseTarget again, this time not avoiding stale nodes.
@@ -304,14 +316,14 @@ public class BlockPlacementPolicyDefault
// not chosen because they were stale, decommissioned, etc.
// We need to additionally exclude the nodes that were added to the
// result list in the successful calls to choose*() above.
- for (Node node : results) {
- oldExcludedNodes.add(node);
+ for (DatanodeStorageInfo resultStorage : results) {
+ oldExcludedNodes.add(resultStorage.getDatanodeDescriptor());
}
// Set numOfReplicas, since it can get out of sync with the result list
// if the NotEnoughReplicasException was thrown in chooseRandom().
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
- maxNodesPerRack, results, false);
+ maxNodesPerRack, results, false, storageType);
}
}
return writer;
@@ -321,32 +333,36 @@ public class BlockPlacementPolicyDefault
* Choose <i>localMachine</i> as the target.
* if <i>localMachine</i> is not available,
* choose a node on the same rack
- * @return the chosen node
+ * @return the chosen storage
*/
- protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+ protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes)
+ List<DatanodeStorageInfo> results,
+ boolean avoidStaleNodes,
+ StorageType storageType)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
// otherwise try local machine first
if (excludedNodes.add(localMachine)) { // was not in the excluded list
- if (addIfIsGoodTarget(localDatanode, excludedNodes, blocksize,
- maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
- return localDatanode;
+ for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+ localDatanode.getStorageInfos())) {
+ if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+ maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) {
+ return localStorage;
+ }
}
}
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
/**
@@ -368,27 +384,29 @@ public class BlockPlacementPolicyDefault
* in the cluster.
* @return the chosen node
*/
- protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+ protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes)
+ List<DatanodeStorageInfo> results,
+ boolean avoidStaleNodes,
+ StorageType storageType)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
// choose one from the local rack
try {
return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
- for(DatanodeDescriptor nextNode : results) {
+ for(DatanodeStorageInfo resultStorage : results) {
+ DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor();
if (nextNode != localMachine) {
newLocal = nextNode;
break;
@@ -397,16 +415,16 @@ public class BlockPlacementPolicyDefault
if (newLocal != null) {
try {
return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
}
}
@@ -423,48 +441,51 @@ public class BlockPlacementPolicyDefault
Set<Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes)
+ List<DatanodeStorageInfo> results,
+ boolean avoidStaleNodes,
+ StorageType storageType)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxReplicasPerRack, results,
- avoidStaleNodes);
+ avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results, avoidStaleNodes);
+ maxReplicasPerRack, results, avoidStaleNodes, storageType);
}
}
/**
* Randomly choose one target from the given <i>scope</i>.
- * @return the chosen node, if there is any.
+ * @return the chosen storage, if there is any.
*/
- protected DatanodeDescriptor chooseRandom(String scope,
+ protected DatanodeStorageInfo chooseRandom(String scope,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes)
+ List<DatanodeStorageInfo> results,
+ boolean avoidStaleNodes,
+ StorageType storageType)
throws NotEnoughReplicasException {
return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
- results, avoidStaleNodes);
+ results, avoidStaleNodes, storageType);
}
/**
* Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
* @return the first chosen node, if there is any.
*/
- protected DatanodeDescriptor chooseRandom(int numOfReplicas,
+ protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
String scope,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes)
+ List<DatanodeStorageInfo> results,
+ boolean avoidStaleNodes,
+ StorageType storageType)
throws NotEnoughReplicasException {
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
@@ -475,24 +496,28 @@ public class BlockPlacementPolicyDefault
builder.setLength(0);
builder.append("[");
}
- boolean badTarget = false;
- DatanodeDescriptor firstChosen = null;
+ boolean goodTarget = false;
+ DatanodeStorageInfo firstChosen = null;
while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
DatanodeDescriptor chosenNode =
(DatanodeDescriptor)clusterMap.chooseRandom(scope);
if (excludedNodes.add(chosenNode)) { //was not in the excluded list
numOfAvailableNodes--;
- int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes,
- blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes);
- if (newExcludedNodes >= 0) {
- numOfReplicas--;
- if (firstChosen == null) {
- firstChosen = chosenNode;
+ final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
+ chosenNode.getStorageInfos());
+ for(int i = 0; i < storages.length && !goodTarget; i++) {
+ final int newExcludedNodes = addIfIsGoodTarget(storages[i],
+ excludedNodes, blocksize, maxNodesPerRack, considerLoad, results,
+ avoidStaleNodes, storageType);
+ goodTarget = newExcludedNodes >= 0;
+ if (goodTarget) {
+ numOfReplicas--;
+ if (firstChosen == null) {
+ firstChosen = storages[i];
+ }
+ numOfAvailableNodes -= newExcludedNodes;
}
- numOfAvailableNodes -= newExcludedNodes;
- } else {
- badTarget = true;
}
}
}
@@ -500,7 +525,7 @@ public class BlockPlacementPolicyDefault
if (numOfReplicas>0) {
String detail = enableDebugLogging;
if (LOG.isDebugEnabled()) {
- if (badTarget && builder != null) {
+ if (!goodTarget && builder != null) {
detail = builder.append("]").toString();
builder.setLength(0);
} else detail = "";
@@ -512,43 +537,46 @@ public class BlockPlacementPolicyDefault
}
/**
- * If the given node is a good target, add it to the result list and
+ * If the given storage is a good target, add it to the result list and
* update the set of excluded nodes.
* @return -1 if the given is not a good target;
* otherwise, return the number of nodes added to excludedNodes set.
*/
- int addIfIsGoodTarget(DatanodeDescriptor node,
+ int addIfIsGoodTarget(DatanodeStorageInfo storage,
Set<Node> excludedNodes,
long blockSize,
int maxNodesPerRack,
boolean considerLoad,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes) {
- if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad,
- results, avoidStaleNodes)) {
- results.add(node);
+ List<DatanodeStorageInfo> results,
+ boolean avoidStaleNodes,
+ StorageType storageType) {
+ if (isGoodTarget(storage, blockSize, maxNodesPerRack, considerLoad,
+ results, avoidStaleNodes, storageType)) {
+ results.add(storage);
// add node and related nodes to excludedNode
- return addToExcludedNodes(node, excludedNodes);
+ return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
} else {
return -1;
}
}
- private static void logNodeIsNotChosen(DatanodeDescriptor node, String reason) {
+ private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String reason) {
if (LOG.isDebugEnabled()) {
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
// build the error message for later use.
debugLoggingBuilder.get()
.append(node).append(": ")
- .append("Node ").append(NodeBase.getPath(node))
+ .append("Storage ").append(storage)
+ .append("at node ").append(NodeBase.getPath(node))
.append(" is not chosen because ")
.append(reason);
}
}
/**
- * Determine if a node is a good target.
+ * Determine if a storage is a good target.
*
- * @param node The target node
+ * @param storage The target storage
* @param blockSize Size of block
* @param maxTargetPerRack Maximum number of targets per rack. The value of
* this parameter depends on the number of racks in
@@ -561,29 +589,47 @@ public class BlockPlacementPolicyDefault
* does not have too much load,
* and the rack does not have too many nodes.
*/
- private boolean isGoodTarget(DatanodeDescriptor node,
+ private boolean isGoodTarget(DatanodeStorageInfo storage,
long blockSize, int maxTargetPerRack,
boolean considerLoad,
- List<DatanodeDescriptor> results,
- boolean avoidStaleNodes) {
- // check if the node is (being) decommissed
+ List<DatanodeStorageInfo> results,
+ boolean avoidStaleNodes,
+ StorageType storageType) {
+ if (storage.getStorageType() != storageType) {
+ logNodeIsNotChosen(storage,
+ "storage types do not match, where the expected storage type is "
+ + storageType);
+ return false;
+ }
+ if (storage.getState() == State.READ_ONLY) {
+ logNodeIsNotChosen(storage, "storage is read-only");
+ return false;
+ }
+ DatanodeDescriptor node = storage.getDatanodeDescriptor();
+ // check if the node is (being) decommissioned
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- logNodeIsNotChosen(node, "the node is (being) decommissioned ");
+ logNodeIsNotChosen(storage, "the node is (being) decommissioned ");
return false;
}
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
- logNodeIsNotChosen(node, "the node is stale ");
+ logNodeIsNotChosen(storage, "the node is stale ");
return false;
}
}
+ final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
+ if (requiredSize > storage.getRemaining()) {
+ logNodeIsNotChosen(storage, "the storage does not have enough space ");
+ return false;
+ }
+ //TODO: move getBlocksScheduled() to DatanodeStorageInfo.
long remaining = node.getRemaining() -
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine
- if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
- logNodeIsNotChosen(node, "the node does not have enough space ");
+ if (requiredSize > remaining) {
+ logNodeIsNotChosen(storage, "the node does not have enough space ");
return false;
}
@@ -595,7 +641,7 @@ public class BlockPlacementPolicyDefault
avgLoad = (double)stats.getTotalLoad()/size;
}
if (node.getXceiverCount() > (2.0 * avgLoad)) {
- logNodeIsNotChosen(node, "the node is too busy ");
+ logNodeIsNotChosen(storage, "the node is too busy ");
return false;
}
}
@@ -603,13 +649,14 @@ public class BlockPlacementPolicyDefault
// check if the target rack has chosen too many nodes
String rackname = node.getNetworkLocation();
int counter=1;
- for(Node result : results) {
- if (rackname.equals(result.getNetworkLocation())) {
+ for(DatanodeStorageInfo resultStorage : results) {
+ if (rackname.equals(
+ resultStorage.getDatanodeDescriptor().getNetworkLocation())) {
counter++;
}
}
if (counter>maxTargetPerRack) {
- logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
+ logNodeIsNotChosen(storage, "the rack has too many chosen nodes ");
return false;
}
return true;
@@ -621,37 +668,40 @@ public class BlockPlacementPolicyDefault
* starts from the writer and traverses all <i>nodes</i>
* This is basically a traveling salesman problem.
*/
- private DatanodeDescriptor[] getPipeline(Node writer,
- DatanodeDescriptor[] nodes) {
- if (nodes.length==0) return nodes;
-
+ private DatanodeStorageInfo[] getPipeline(Node writer,
+ DatanodeStorageInfo[] storages) {
+ if (storages.length == 0) {
+ return storages;
+ }
+
synchronized(clusterMap) {
int index=0;
if (writer == null || !clusterMap.contains(writer)) {
- writer = nodes[0];
+ writer = storages[0].getDatanodeDescriptor();
}
- for(;index<nodes.length; index++) {
- DatanodeDescriptor shortestNode = nodes[index];
- int shortestDistance = clusterMap.getDistance(writer, shortestNode);
+ for(; index < storages.length; index++) {
+ DatanodeStorageInfo shortestStorage = storages[index];
+ int shortestDistance = clusterMap.getDistance(writer,
+ shortestStorage.getDatanodeDescriptor());
int shortestIndex = index;
- for(int i=index+1; i<nodes.length; i++) {
- DatanodeDescriptor currentNode = nodes[i];
- int currentDistance = clusterMap.getDistance(writer, currentNode);
+ for(int i = index + 1; i < storages.length; i++) {
+ int currentDistance = clusterMap.getDistance(writer,
+ storages[i].getDatanodeDescriptor());
if (shortestDistance>currentDistance) {
shortestDistance = currentDistance;
- shortestNode = currentNode;
+ shortestStorage = storages[i];
shortestIndex = i;
}
}
//switch position index & shortestIndex
if (index != shortestIndex) {
- nodes[shortestIndex] = nodes[index];
- nodes[index] = shortestNode;
+ storages[shortestIndex] = storages[index];
+ storages[index] = shortestStorage;
}
- writer = shortestNode;
+ writer = shortestStorage.getDatanodeDescriptor();
}
}
- return nodes;
+ return storages;
}
@Override
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Wed Sep 18 15:12:52 2013
@@ -25,6 +25,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology;
@@ -64,81 +66,87 @@ public class BlockPlacementPolicyWithNod
* @return the chosen node
*/
@Override
- protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+ protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
- List<DatanodeDescriptor> results, boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+ StorageType storageType) throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+ // otherwise try local machine first
if (localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
- // otherwise try local machine first
if (excludedNodes.add(localMachine)) { // was not in the excluded list
- if (addIfIsGoodTarget(localDataNode, excludedNodes, blocksize,
- maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
- return localDataNode;
+ for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+ localDataNode.getStorageInfos())) {
+ if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+ maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) {
+ return localStorage;
+ }
}
}
}
// try a node on local node group
- DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
+ DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
- if (chosenNode != null) {
- return chosenNode;
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+ if (chosenStorage != null) {
+ return chosenStorage;
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+ }
+
+ /** @return the node of the second replica */
+ private static DatanodeDescriptor secondNode(Node localMachine,
+ List<DatanodeStorageInfo> results) {
+ // find the second replica
+ for(DatanodeStorageInfo nextStorage : results) {
+ DatanodeDescriptor nextNode = nextStorage.getDatanodeDescriptor();
+ if (nextNode != localMachine) {
+ return nextNode;
+ }
+ }
+ return null;
}
-
@Override
- protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+ protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
- List<DatanodeDescriptor> results, boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+ StorageType storageType) throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results,
- avoidStaleNodes);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
// choose one from the local rack, but off-nodegroup
try {
- return chooseRandom(NetworkTopology.getFirstHalf(
- localMachine.getNetworkLocation()),
- excludedNodes, blocksize,
- maxNodesPerRack, results,
- avoidStaleNodes);
+ final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation());
+ return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack,
+ results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e1) {
// find the second replica
- DatanodeDescriptor newLocal=null;
- for(DatanodeDescriptor nextNode : results) {
- if (nextNode != localMachine) {
- newLocal = nextNode;
- break;
- }
- }
+ final DatanodeDescriptor newLocal = secondNode(localMachine, results);
if (newLocal != null) {
try {
return chooseRandom(
clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
}
}
@@ -146,8 +154,9 @@ public class BlockPlacementPolicyWithNod
@Override
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine, Set<Node> excludedNodes,
- long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
- boolean avoidStaleNodes) throws NotEnoughReplicasException {
+ long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results,
+ boolean avoidStaleNodes, StorageType storageType)
+ throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
final String rackLocation = NetworkTopology.getFirstHalf(
@@ -155,12 +164,12 @@ public class BlockPlacementPolicyWithNod
try {
// randomly choose from remote racks
chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize,
- maxReplicasPerRack, results, avoidStaleNodes);
+ maxReplicasPerRack, results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e) {
// fall back to the local rack
chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
rackLocation, excludedNodes, blocksize,
- maxReplicasPerRack, results, avoidStaleNodes);
+ maxReplicasPerRack, results, avoidStaleNodes, storageType);
}
}
@@ -170,46 +179,40 @@ public class BlockPlacementPolicyWithNod
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
- private DatanodeDescriptor chooseLocalNodeGroup(
+ private DatanodeStorageInfo chooseLocalNodeGroup(
NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
- List<DatanodeDescriptor> results, boolean avoidStaleNodes)
- throws NotEnoughReplicasException {
+ List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+ StorageType storageType) throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
// choose one from the local node group
try {
return chooseRandom(
clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
- excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
+ excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
+ storageType);
} catch (NotEnoughReplicasException e1) {
- // find the second replica
- DatanodeDescriptor newLocal=null;
- for(DatanodeDescriptor nextNode : results) {
- if (nextNode != localMachine) {
- newLocal = nextNode;
- break;
- }
- }
+ final DatanodeDescriptor newLocal = secondNode(localMachine, results);
if (newLocal != null) {
try {
return chooseRandom(
clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results,
- avoidStaleNodes);
+ avoidStaleNodes, storageType);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes);
+ maxNodesPerRack, results, avoidStaleNodes, storageType);
}
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Sep 18 15:12:52 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.bl
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -54,9 +55,9 @@ public class DatanodeDescriptor extends
@InterfaceStability.Evolving
public static class BlockTargetPair {
public final Block block;
- public final DatanodeDescriptor[] targets;
+ public final DatanodeStorageInfo[] targets;
- BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
+ BlockTargetPair(Block block, DatanodeStorageInfo[] targets) {
this.block = block;
this.targets = targets;
}
@@ -215,14 +216,15 @@ public class DatanodeDescriptor extends
return false;
}
- public DatanodeStorageInfo getStorageInfo(String storageID) {
+ DatanodeStorageInfo getStorageInfo(String storageID) {
synchronized (storageMap) {
return storageMap.get(storageID);
}
}
- public Collection<DatanodeStorageInfo> getStorageInfos() {
+ DatanodeStorageInfo[] getStorageInfos() {
synchronized (storageMap) {
- return new ArrayList<DatanodeStorageInfo>(storageMap.values());
+ final Collection<DatanodeStorageInfo> storages = storageMap.values();
+ return storages.toArray(new DatanodeStorageInfo[storages.size()]);
}
}
@@ -255,14 +257,6 @@ public class DatanodeDescriptor extends
}
/**
- * Used for testing only
- * @return the head of the blockList
- */
- protected BlockInfo getHead(){
- return getBlockIterator().next();
- }
-
- /**
* Replace specified old block with a new one in the DataNodeDescriptor.
*
* @param oldBlock - block to be replaced
@@ -325,20 +319,15 @@ public class DatanodeDescriptor extends
}
private static class BlockIterator implements Iterator<BlockInfo> {
- private final int maxIndex;
private int index = 0;
- private List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
+ private final List<Iterator<BlockInfo>> iterators;
- private BlockIterator(final Iterable<DatanodeStorageInfo> storages) {
+ private BlockIterator(final DatanodeStorageInfo... storages) {
+ List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
for (DatanodeStorageInfo e : storages) {
iterators.add(e.getBlockIterator());
}
- maxIndex = iterators.size() - 1;
- }
-
- private BlockIterator(final DatanodeStorageInfo storage) {
- iterators.add(storage.getBlockIterator());
- maxIndex = iterators.size() - 1;
+ this.iterators = Collections.unmodifiableList(iterators);
}
@Override
@@ -359,7 +348,7 @@ public class DatanodeDescriptor extends
}
private void update() {
- while(index < maxIndex && !iterators.get(index).hasNext()) {
+ while(index < iterators.size() - 1 && !iterators.get(index).hasNext()) {
index++;
}
}
@@ -375,7 +364,7 @@ public class DatanodeDescriptor extends
/**
* Store block replication work.
*/
- void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
+ void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
assert(block != null && targets != null && targets.length > 0);
replicateBlocks.offer(new BlockTargetPair(block, targets));
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Wed Sep 18 15:12:52 2013
@@ -32,7 +32,9 @@ import org.apache.hadoop.hdfs.server.pro
* by this class.
*/
public class DatanodeStorageInfo {
- static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) {
+ public static final DatanodeStorageInfo[] EMPTY_ARRAY = {};
+
+ public static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) {
return toDatanodeInfos(Arrays.asList(storages));
}
static DatanodeInfo[] toDatanodeInfos(List<DatanodeStorageInfo> storages) {
@@ -43,6 +45,22 @@ public class DatanodeStorageInfo {
return datanodes;
}
+ public static String[] toStorageIDs(DatanodeStorageInfo[] storages) {
+ String[] storageIDs = new String[storages.length];
+ for(int i = 0; i < storageIDs.length; i++) {
+ storageIDs[i] = storages[i].getStorageID();
+ }
+ return storageIDs;
+ }
+
+ public static StorageType[] toStorageTypes(DatanodeStorageInfo[] storages) {
+ StorageType[] storageTypes = new StorageType[storages.length];
+ for(int i = 0; i < storageTypes.length; i++) {
+ storageTypes[i] = storages[i].getStorageType();
+ }
+ return storageTypes;
+ }
+
/**
* Iterates over the list of blocks belonging to the data-node.
*/
@@ -208,6 +226,22 @@ public class DatanodeStorageInfo {
}
@Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (obj == null || !(obj instanceof DatanodeStorageInfo)) {
+ return false;
+ }
+ final DatanodeStorageInfo that = (DatanodeStorageInfo)obj;
+ return this.storageID.equals(that.storageID);
+ }
+
+ @Override
+ public int hashCode() {
+ return storageID.hashCode();
+ }
+
+ @Override
public String toString() {
return "[" + storageType + "]" + storageID + ":" + state;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1524444&r1=1524443&r2=1524444&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Wed Sep 18 15:12:52 2013
@@ -76,7 +76,7 @@ class PendingReplicationBlocks {
* @param block The corresponding block
* @param targets The DataNodes where replicas of the block should be placed
*/
- void increment(Block block, DatanodeDescriptor[] targets) {
+ void increment(Block block, DatanodeStorageInfo[] targets) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found == null) {
@@ -95,14 +95,14 @@ class PendingReplicationBlocks {
*
* @param The DataNode that finishes the replication
*/
- void decrement(Block block, DatanodeDescriptor dn) {
+ void decrement(Block block, DatanodeDescriptor dn, String storageID) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Removing pending replication for " + block);
}
- found.decrementReplicas(dn);
+ found.decrementReplicas(dn.getStorageInfo(storageID));
if (found.getNumReplicas() <= 0) {
pendingReplications.remove(block);
}
@@ -174,12 +174,12 @@ class PendingReplicationBlocks {
*/
static class PendingBlockInfo {
private long timeStamp;
- private final List<DatanodeDescriptor> targets;
+ private final List<DatanodeStorageInfo> targets;
- PendingBlockInfo(DatanodeDescriptor[] targets) {
+ PendingBlockInfo(DatanodeStorageInfo[] targets) {
this.timeStamp = now();
- this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
- : new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
+ this.targets = targets == null ? new ArrayList<DatanodeStorageInfo>()
+ : new ArrayList<DatanodeStorageInfo>(Arrays.asList(targets));
}
long getTimeStamp() {
@@ -190,16 +190,16 @@ class PendingReplicationBlocks {
timeStamp = now();
}
- void incrementReplicas(DatanodeDescriptor... newTargets) {
+ void incrementReplicas(DatanodeStorageInfo... newTargets) {
if (newTargets != null) {
- for (DatanodeDescriptor dn : newTargets) {
+ for (DatanodeStorageInfo dn : newTargets) {
targets.add(dn);
}
}
}
- void decrementReplicas(DatanodeDescriptor dn) {
- targets.remove(dn);
+ void decrementReplicas(DatanodeStorageInfo storage) {
+ targets.remove(storage);
}
int getNumReplicas() {