You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2014/07/23 19:25:06 UTC
svn commit: r1612880 - in
/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs:
CHANGES.txt
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
Author: szetszwo
Date: Wed Jul 23 17:25:06 2014
New Revision: 1612880
URL: http://svn.apache.org/r1612880
Log:
HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage types are unavailable.
Modified:
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1612880&r1=1612879&r2=1612880&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Jul 23 17:25:06 2014
@@ -17,6 +17,9 @@ HDFS-6584: Archival Storage
HDFS-6679. Bump NameNodeLayoutVersion and update editsStored test files.
(vinayakumarb via szetszwo)
+ HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage
+ types are unavailable. (szetszwo)
+
Trunk (Unreleased)
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1612880&r1=1612879&r2=1612880&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Jul 23 17:25:06 2014
@@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Tim
import java.util.ArrayList;
import java.util.Collection;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -218,7 +219,8 @@ public class BlockPlacementPolicyDefault
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
- blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy);
+ blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
+ EnumSet.noneOf(StorageType.class), results.isEmpty());
if (!returnChosenNodes) {
results.removeAll(chosenStorage);
}
@@ -238,7 +240,40 @@ public class BlockPlacementPolicyDefault
int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
return new int[] {numOfReplicas, maxNodesPerRack};
}
-
+
+ private static List<StorageType> selectStorageTypes(
+ final BlockStoragePolicy storagePolicy,
+ final short replication,
+ final Iterable<StorageType> chosen,
+ final EnumSet<StorageType> unavailableStorages,
+ final boolean isNewBlock) {
+ final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
+ replication, chosen);
+ final List<StorageType> removed = new ArrayList<StorageType>();
+ for(int i = storageTypes.size() - 1; i >= 0; i--) {
+ // replace/remove unavailable storage types.
+ final StorageType t = storageTypes.get(i);
+ if (unavailableStorages.contains(t)) {
+ final StorageType fallback = isNewBlock?
+ storagePolicy.getCreationFallback(unavailableStorages)
+ : storagePolicy.getReplicationFallback(unavailableStorages);
+ if (fallback == null) {
+ removed.add(storageTypes.remove(i));
+ } else {
+ storageTypes.set(i, fallback);
+ }
+ }
+ }
+ if (storageTypes.size() < replication) {
+ LOG.warn("Failed to place enough replicas: replication is " + replication
+ + " but only " + storageTypes.size() + " storage types can be selected "
+ + "(selected=" + storageTypes
+ + ", unavailable=" + unavailableStorages
+ + ", removed=" + removed
+ + ", policy=" + storagePolicy + ")");
+ }
+ return storageTypes;
+ }
/**
* choose <i>numOfReplicas</i> from all data nodes
* @param numOfReplicas additional number of replicas wanted
@@ -257,14 +292,14 @@ public class BlockPlacementPolicyDefault
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
- final BlockStoragePolicy storagePolicy) {
+ final BlockStoragePolicy storagePolicy,
+ final EnumSet<StorageType> unavailableStorages,
+ final boolean newBlock) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
- int totalReplicasExpected = numOfReplicas + results.size();
-
- int numOfResults = results.size();
- boolean newBlock = (numOfResults==0);
+ final int numOfResults = results.size();
+ final int totalReplicasExpected = numOfReplicas + numOfResults;
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
writer = results.get(0).getDatanodeDescriptor();
}
@@ -272,12 +307,25 @@ public class BlockPlacementPolicyDefault
// Keep a copy of original excludedNodes
final Set<Node> oldExcludedNodes = avoidStaleNodes ?
new HashSet<Node>(excludedNodes) : null;
- final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
- (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results));
+
+ // choose storage types; use fallbacks for unavailable storages
+ final List<StorageType> storageTypes = selectStorageTypes(storagePolicy,
+ (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results),
+ unavailableStorages, newBlock);
+
+ StorageType curStorageType = null;
try {
+ if ((numOfReplicas = storageTypes.size()) == 0) {
+ throw new NotEnoughReplicasException(
+ "All required storage types are unavailable: "
+ + " unavailableStorages=" + unavailableStorages
+ + ", storagePolicy=" + storagePolicy);
+ }
+
if (numOfResults == 0) {
+ curStorageType = storageTypes.remove(0);
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0), true)
+ maxNodesPerRack, results, avoidStaleNodes, curStorageType, true)
.getDatanodeDescriptor();
if (--numOfReplicas == 0) {
return writer;
@@ -285,30 +333,33 @@ public class BlockPlacementPolicyDefault
}
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
+ curStorageType = storageTypes.remove(0);
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
- results, avoidStaleNodes, storageTypes.remove(0));
+ results, avoidStaleNodes, curStorageType);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
+ curStorageType = storageTypes.remove(0);
if (clusterMap.isOnSameRack(dn0, dn1)) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
- results, avoidStaleNodes, storageTypes.remove(0));
+ results, avoidStaleNodes, curStorageType);
} else if (newBlock){
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
- results, avoidStaleNodes, storageTypes.remove(0));
+ results, avoidStaleNodes, curStorageType);
} else {
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
- results, avoidStaleNodes, storageTypes.remove(0));
+ results, avoidStaleNodes, curStorageType);
}
if (--numOfReplicas == 0) {
return writer;
}
}
+ curStorageType = storageTypes.remove(0);
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0));
+ maxNodesPerRack, results, avoidStaleNodes, curStorageType);
} catch (NotEnoughReplicasException e) {
final String message = "Failed to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
@@ -333,7 +384,16 @@ public class BlockPlacementPolicyDefault
// if the NotEnoughReplicasException was thrown in chooseRandom().
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
- maxNodesPerRack, results, false, storagePolicy);
+ maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
+ newBlock);
+ }
+
+ if (storageTypes.size() > 0) {
+ // Retry chooseTarget with fallback storage types
+ unavailableStorages.add(curStorageType);
+ return chooseTarget(numOfReplicas, writer, excludedNodes, blocksize,
+ maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
+ newBlock);
}
}
return writer;