You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2014/07/23 19:25:06 UTC

svn commit: r1612880 - in /hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

Author: szetszwo
Date: Wed Jul 23 17:25:06 2014
New Revision: 1612880

URL: http://svn.apache.org/r1612880
Log:
HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage types are unavailable.

Modified:
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1612880&r1=1612879&r2=1612880&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Jul 23 17:25:06 2014
@@ -17,6 +17,9 @@ HDFS-6584: Archival Storage
     HDFS-6679. Bump NameNodeLayoutVersion and update editsStored test files.
     (vinayakumarb via szetszwo)
 
+    HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage
+    types are unavailable.  (szetszwo)
+
 Trunk (Unreleased)
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1612880&r1=1612879&r2=1612880&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Jul 23 17:25:06 2014
@@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Tim
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -218,7 +219,8 @@ public class BlockPlacementPolicyDefault
     boolean avoidStaleNodes = (stats != null
         && stats.isAvoidingStaleDataNodesForWrite());
     final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
-        blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
+        EnumSet.noneOf(StorageType.class), results.isEmpty());
     if (!returnChosenNodes) {  
       results.removeAll(chosenStorage);
     }
@@ -238,7 +240,40 @@ public class BlockPlacementPolicyDefault
     int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
     return new int[] {numOfReplicas, maxNodesPerRack};
   }
-    
+
+  private static List<StorageType> selectStorageTypes(
+      final BlockStoragePolicy storagePolicy,
+      final short replication,
+      final Iterable<StorageType> chosen,
+      final EnumSet<StorageType> unavailableStorages,
+      final boolean isNewBlock) {
+    final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
+        replication, chosen);
+    final List<StorageType> removed = new ArrayList<StorageType>();
+    for(int i = storageTypes.size() - 1; i >= 0; i--) {
+      // replace/remove unavailable storage types.
+      final StorageType t = storageTypes.get(i);
+      if (unavailableStorages.contains(t)) {
+        final StorageType fallback = isNewBlock?
+            storagePolicy.getCreationFallback(unavailableStorages)
+            : storagePolicy.getReplicationFallback(unavailableStorages);
+        if (fallback == null) {
+          removed.add(storageTypes.remove(i));
+        } else {
+          storageTypes.set(i, fallback);
+        }
+      }
+    }
+    if (storageTypes.size() < replication) {
+      LOG.warn("Failed to place enough replicas: replication is " + replication
+          + " but only " + storageTypes.size() + " storage types can be selected "
+          + "(selected=" + storageTypes
+          + ", unavailable=" + unavailableStorages
+          + ", removed=" + removed
+          + ", policy=" + storagePolicy + ")");
+    }
+    return storageTypes;
+  }
   /**
    * choose <i>numOfReplicas</i> from all data nodes
    * @param numOfReplicas additional number of replicas wanted
@@ -257,14 +292,14 @@ public class BlockPlacementPolicyDefault
                             final int maxNodesPerRack,
                             final List<DatanodeStorageInfo> results,
                             final boolean avoidStaleNodes,
-                            final BlockStoragePolicy storagePolicy) {
+                            final BlockStoragePolicy storagePolicy,
+                            final EnumSet<StorageType> unavailableStorages,
+                            final boolean newBlock) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return writer;
     }
-    int totalReplicasExpected = numOfReplicas + results.size();
-      
-    int numOfResults = results.size();
-    boolean newBlock = (numOfResults==0);
+    final int numOfResults = results.size();
+    final int totalReplicasExpected = numOfReplicas + numOfResults;
     if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
       writer = results.get(0).getDatanodeDescriptor();
     }
@@ -272,12 +307,25 @@ public class BlockPlacementPolicyDefault
     // Keep a copy of original excludedNodes
     final Set<Node> oldExcludedNodes = avoidStaleNodes ? 
         new HashSet<Node>(excludedNodes) : null;
-    final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
-        (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results));
+
+    // choose storage types; use fallbacks for unavailable storages
+    final List<StorageType> storageTypes = selectStorageTypes(storagePolicy,
+        (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results),
+        unavailableStorages, newBlock);
+
+    StorageType curStorageType = null;
     try {
+      if ((numOfReplicas = storageTypes.size()) == 0) {
+        throw new NotEnoughReplicasException(
+            "All required storage types are unavailable: "
+            + " unavailableStorages=" + unavailableStorages
+            + ", storagePolicy=" + storagePolicy);
+      }
+
       if (numOfResults == 0) {
+        curStorageType = storageTypes.remove(0);
         writer = chooseLocalStorage(writer, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0), true)
+            maxNodesPerRack, results, avoidStaleNodes, curStorageType, true)
                 .getDatanodeDescriptor();
         if (--numOfReplicas == 0) {
           return writer;
@@ -285,30 +333,33 @@ public class BlockPlacementPolicyDefault
       }
       final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
       if (numOfResults <= 1) {
+        curStorageType = storageTypes.remove(0);
         chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
-            results, avoidStaleNodes, storageTypes.remove(0));
+            results, avoidStaleNodes, curStorageType);
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       if (numOfResults <= 2) {
         final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
+        curStorageType = storageTypes.remove(0);
         if (clusterMap.isOnSameRack(dn0, dn1)) {
           chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes, storageTypes.remove(0));
+              results, avoidStaleNodes, curStorageType);
         } else if (newBlock){
           chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes, storageTypes.remove(0));
+              results, avoidStaleNodes, curStorageType);
         } else {
           chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes, storageTypes.remove(0));
+              results, avoidStaleNodes, curStorageType);
         }
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
+      curStorageType = storageTypes.remove(0);
       chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0));
+          maxNodesPerRack, results, avoidStaleNodes, curStorageType);
     } catch (NotEnoughReplicasException e) {
       final String message = "Failed to place enough replicas, still in need of "
           + (totalReplicasExpected - results.size()) + " to reach "
@@ -333,7 +384,16 @@ public class BlockPlacementPolicyDefault
         // if the NotEnoughReplicasException was thrown in chooseRandom().
         numOfReplicas = totalReplicasExpected - results.size();
         return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
-            maxNodesPerRack, results, false, storagePolicy);
+            maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
+            newBlock);
+      }
+
+      if (storageTypes.size() > 0) {
+        // Retry chooseTarget with fallback storage types
+        unavailableStorages.add(curStorageType);
+        return chooseTarget(numOfReplicas, writer, excludedNodes, blocksize,
+            maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
+            newBlock);
       }
     }
     return writer;