You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2013/07/16 00:20:00 UTC

svn commit: r1503497 - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/net/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/net/

Author: cmccabe
Date: Mon Jul 15 22:19:59 2013
New Revision: 1503497

URL: http://svn.apache.org/r1503497
Log:
HDFS-4521.  Invalid Network Topologies should not be cached (Junping Du via Colin Patrick McCabe)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/DNSToSwitchMapping.java
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NodeBase.java
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/ScriptBasedMapping.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/StaticMapping.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopology.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Jul 15 22:19:59 2013
@@ -74,6 +74,9 @@ Release 1.3.0 - unreleased
     HADOOP-9307. BufferedFSInputStream.read returns wrong results after certain
     seeks. (todd)
 
+    HDFS-4521.  Invalid Network Topologies should not be cached.
+    (Backported by Junping Du, reviewed by Colin Patrick McCabe.)
+
 Release 1.2.1 - 2013.07.06
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/CachedDNSToSwitchMapping.java Mon Jul 15 22:19:59 2013
@@ -98,6 +98,11 @@ public class CachedDNSToSwitchMapping im
     List<String> resolvedHosts = rawMapping.resolve(uncachedHosts);
     this.cacheResolvedHosts(uncachedHosts, resolvedHosts);
     return this.getCachedHosts(names);
-
   }
+  
+  @Override
+  public void reloadCachedMappings() {
+    cache.clear();
+  }
+  
 }

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/DNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/DNSToSwitchMapping.java?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/DNSToSwitchMapping.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/DNSToSwitchMapping.java Mon Jul 15 22:19:59 2013
@@ -18,7 +18,6 @@
 package org.apache.hadoop.net;
 
 import java.util.List;
-import java.net.UnknownHostException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -45,5 +44,13 @@ public interface DNSToSwitchMapping {
    * @return list of resolved network paths
    */
   public List<String> resolve(List<String> names);
+  
+  /**
+   * Reload all of the cached mappings.
+   *
+   * If there is a cache, this method will clear it, so that future accesses
+   * will get a chance to see the new data.
+   */
+  public void reloadCachedMappings();
 
 }

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java Mon Jul 15 22:19:59 2013
@@ -371,8 +371,17 @@ public class NetworkTopology {
       throw new IllegalArgumentException(
         "Not allow to add an inner node: "+NodeBase.getPath(node));
     }
+    int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
     netlock.writeLock().lock();
     try {
+      if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
+        LOG.error("Error: can't add leaf node at depth " +
+            newDepth + " to topology:\n" + oldTopoStr);
+        throw new InvalidTopologyException("Invalid network topology. " +
+            "You cannot have a rack and a non-rack node at the same " +
+            "level of the network topology.");
+      }
+    
       Node rack = getNodeForNetworkLocation(node);
       if (rack != null && !(rack instanceof InnerNode)) {
         throw new IllegalArgumentException("Unexpected data node " 
@@ -387,14 +396,6 @@ public class NetworkTopology {
         if (!(node instanceof InnerNode)) {
           if (depthOfAllLeaves == -1) {
             depthOfAllLeaves = node.getLevel();
-          } else {
-            if (depthOfAllLeaves != node.getLevel()) {
-              LOG.error("Error: can't add leaf node at depth " +
-                  node.getLevel() + " to topology:\n" + oldTopoStr);
-              throw new InvalidTopologyException("Invalid network topology. " +
-                  "You cannot have a rack and a non-rack node at the same " +
-                  "level of the network topology.");
-            }
           }
         }
       }

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NodeBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NodeBase.java?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NodeBase.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NodeBase.java Mon Jul 15 22:19:59 2013
@@ -131,4 +131,16 @@ public class NodeBase implements Node {
   public void setLevel(int level) {
     this.level = level;
   }
+  
+  public static int locationToDepth(String location) {
+    String normalizedLocation = normalize(location);
+    int length = normalizedLocation.length();
+    int depth = 0;
+    for (int i = 0; i < length; i++) {
+      if (normalizedLocation.charAt(i) == PATH_SEPARATOR) {
+        depth++;
+      }
+    }
+    return depth;
+  }
 }

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/ScriptBasedMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/ScriptBasedMapping.java?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/ScriptBasedMapping.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/ScriptBasedMapping.java Mon Jul 15 22:19:59 2013
@@ -155,5 +155,11 @@ implements Configurable
     }
     return allOutput.toString();
   }
+  
+  @Override
+  public void reloadCachedMappings() {
+    // nothing to do here, since RawScriptBasedMapping has no cache.
+  }
+  
   }
 }

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Jul 15 22:19:59 2013
@@ -114,6 +114,7 @@ import org.apache.hadoop.metrics2.util.M
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
@@ -2848,99 +2849,107 @@ public class FSNamesystem implements FSC
                                       nodeReg.getInfoPort(),
                                       nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
-    nodeReg.exportedKeys = getBlockKeys();
+    try {
+      nodeReg.exportedKeys = getBlockKeys();
       
-    NameNode.stateChangeLog.info(
-                                 "BLOCK* registerDatanode: "
-                                 + "node registration from " + nodeReg.getName()
-                                 + " storage " + nodeReg.getStorageID());
+      NameNode.stateChangeLog.info(
+                                   "BLOCK* registerDatanode: "
+                                   + "node registration from " + nodeReg.getName()
+                                   + " storage " + nodeReg.getStorageID());
 
-    DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
-    DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
+      DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
+      DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
       
-    if (nodeN != null && nodeN != nodeS) {
-      NameNode.LOG.info("BLOCK* registerDatanode: "
-                        + "node from name: " + nodeN.getName());
-      // nodeN previously served a different data storage, 
-      // which is not served by anybody anymore.
-      removeDatanode(nodeN);
-      // physically remove node from datanodeMap
-      wipeDatanode(nodeN);
-      nodeN = null;
-    }
-
-    if (nodeS != null) {
-      if (nodeN == nodeS) {
-        // The same datanode has been just restarted to serve the same data 
-        // storage. We do not need to remove old data blocks, the delta will
-        // be calculated on the next block report from the datanode
-        NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
-                                      + "node restarted");
-      } else {
-        // nodeS is found
-        /* The registering datanode is a replacement node for the existing 
-          data storage, which from now on will be served by a new node.
-          If this message repeats, both nodes might have same storageID 
-          by (insanely rare) random chance. User needs to restart one of the
-          nodes with its data cleared (or user can just remove the StorageID
-          value in "VERSION" file under the data directory of the datanode,
-          but this is might not work if VERSION file format has changed 
-       */        
-        NameNode.stateChangeLog.info( "BLOCK* registerDatanode: "
-                                      + "node " + nodeS.getName()
-                                      + " is replaced by " + nodeReg.getName() + 
-                                      " with the same storageID " +
-                                      nodeReg.getStorageID());
-      }
-      // update cluster map
-      clusterMap.remove(nodeS);
-      nodeS.updateRegInfo(nodeReg);
-      nodeS.setHostName(hostName);
+      if (nodeN != null && nodeN != nodeS) {
+        NameNode.LOG.info("BLOCK* registerDatanode: "
+                          + "node from name: " + nodeN.getName());
+        // nodeN previously served a different data storage, 
+        // which is not served by anybody anymore.
+        removeDatanode(nodeN);
+        // physically remove node from datanodeMap
+        wipeDatanode(nodeN);
+        nodeN = null;
+      }
+
+      if (nodeS != null) {
+        if (nodeN == nodeS) {
+          // The same datanode has been just restarted to serve the same data 
+          // storage. We do not need to remove old data blocks, the delta will
+          // be calculated on the next block report from the datanode
+          NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
+                                        + "node restarted");
+        } else {
+          // nodeS is found
+          /* The registering datanode is a replacement node for the existing 
+            data storage, which from now on will be served by a new node.
+            If this message repeats, both nodes might have same storageID 
+            by (insanely rare) random chance. User needs to restart one of the
+            nodes with its data cleared (or user can just remove the StorageID
+            value in "VERSION" file under the data directory of the datanode,
+            but this is might not work if VERSION file format has changed 
+         */        
+          NameNode.stateChangeLog.info( "BLOCK* registerDatanode: "
+                                        + "node " + nodeS.getName()
+                                        + " is replaced by " + nodeReg.getName() + 
+                                        " with the same storageID " +
+                                        nodeReg.getStorageID());
+        }
+        // update cluster map
+        clusterMap.remove(nodeS);
+        nodeS.updateRegInfo(nodeReg);
+        nodeS.setHostName(hostName);
       
-      // resolve network location
-      resolveNetworkLocation(nodeS);
-      clusterMap.add(nodeS);
+        // resolve network location
+        resolveNetworkLocation(nodeS);
+        clusterMap.add(nodeS);
         
-      // also treat the registration message as a heartbeat
-      synchronized(heartbeats) {
-        if( !heartbeats.contains(nodeS)) {
-          heartbeats.add(nodeS);
-          //update its timestamp
-          nodeS.updateHeartbeat(0L, 0L, 0L, 0);
-          nodeS.isAlive = true;
+        // also treat the registration message as a heartbeat
+        synchronized(heartbeats) {
+          if( !heartbeats.contains(nodeS)) {
+            heartbeats.add(nodeS);
+            //update its timestamp
+            nodeS.updateHeartbeat(0L, 0L, 0L, 0);
+            nodeS.isAlive = true;
+          }
         }
-      }
-      return;
-    } 
+        return;
+      } 
 
-    // this is a new datanode serving a new data storage
-    if (nodeReg.getStorageID().equals("")) {
-      // this data storage has never been registered
-      // it is either empty or was created by pre-storageID version of DFS
-      nodeReg.storageID = newStorageID();
-      NameNode.stateChangeLog.debug(
-                                    "BLOCK* registerDatanode: "
-                                    + "new storageID " + nodeReg.getStorageID() + " assigned");
-    }
-    // register new datanode
-    DatanodeDescriptor nodeDescr 
-      = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
-    resolveNetworkLocation(nodeDescr);
-    unprotectedAddDatanode(nodeDescr);
-    clusterMap.add(nodeDescr);
+      // this is a new datanode serving a new data storage
+      if (nodeReg.getStorageID().equals("")) {
+        // this data storage has never been registered
+        // it is either empty or was created by pre-storageID version of DFS
+        nodeReg.storageID = newStorageID();
+        NameNode.stateChangeLog.debug(
+                                      "BLOCK* registerDatanode: "
+                                      + "new storageID " + nodeReg.getStorageID() + " assigned");
+      }
+      // register new datanode
+      DatanodeDescriptor nodeDescr 
+        = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
+      resolveNetworkLocation(nodeDescr);
+      clusterMap.add(nodeDescr); // may throw InvalidTopologyException
+      unprotectedAddDatanode(nodeDescr);
       
-    // also treat the registration message as a heartbeat
-    synchronized(heartbeats) {
-      heartbeats.add(nodeDescr);
-      nodeDescr.isAlive = true;
-      // no need to update its timestamp
-      // because its is done when the descriptor is created
-    }
+      // also treat the registration message as a heartbeat
+      synchronized(heartbeats) {
+        heartbeats.add(nodeDescr);
+        nodeDescr.isAlive = true;
+        // no need to update its timestamp
+        // because its is done when the descriptor is created
+      }
 
-    if (safeMode != null) {
-      safeMode.checkMode();
+      if (safeMode != null) {
+        safeMode.checkMode();
+      }
+      return;
+    } catch (InvalidTopologyException e) {
+      // If the network location is invalid, clear the cached mappings
+      // so that we have a chance to re-add this DataNode with the
+      // correct network location later.
+      dnsToSwitchMapping.reloadCachedMappings();
+      throw e;
     }
-    return;
   }
     
   /* Resolve a node's network location */

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Mon Jul 15 22:19:59 2013
@@ -30,6 +30,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.StaticMapping;
@@ -69,6 +72,7 @@ public class MiniDFSCluster {
     }
   }
 
+  private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
   private Configuration conf;
   protected NameNode nameNode;
   protected int numDataNodes;
@@ -381,7 +385,6 @@ public class MiniDFSCluster {
                         operation != StartupOption.ROLLBACK) ?
         null : new String[] {operation.getName()};
     
-    
     for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
       Configuration dnConf = new Configuration(conf);
       if (manageDfsDirs) {
@@ -431,8 +434,12 @@ public class MiniDFSCluster {
         StaticMapping.addNodeToRack(ipAddr + ":" + port,
                                   racks[i-curDatanodesNum]);
       }
-      DataNode.runDatanodeDaemon(dn);
-      dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
+      try {
+        dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
+        DataNode.runDatanodeDaemon(dn);
+      } catch (RemoteException e) {
+        LOG.warn("Datanode:" + dn.getHostName() + " is failed to be started!");
+      }
     }
     curDatanodesNum += numDataNodes;
     this.numDataNodes += numDataNodes;
@@ -870,6 +877,14 @@ public class MiniDFSCluster {
   }
 
   private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
+  
+    for (DataNodeProperties datanode: dataNodes) {
+      // If any datanode failed to start, then do not wait 
+      if (nameNode.getNamesystem().getDataNodeInfo(
+          datanode.datanode.dnRegistration.getStorageID()) == null) {
+        return false;
+      }
+    }
     if (dnInfo.length != numDataNodes) {
       return true;
     }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/StaticMapping.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/StaticMapping.java?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/StaticMapping.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/StaticMapping.java Mon Jul 15 22:19:59 2013
@@ -59,4 +59,9 @@ public class StaticMapping extends Confi
       return m;
     }
   }
+  
+  public void reloadCachedMappings() {
+    // reloadCachedMappings does nothing for StaticMapping; there is
+    // nowhere to reload from since all data is in memory.
+  }
 }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopology.java?rev=1503497&r1=1503496&r2=1503497&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopology.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/net/TestNetworkTopology.java Mon Jul 15 22:19:59 2013
@@ -24,8 +24,18 @@ import java.util.Map;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
+import org.junit.Assert;
 
 public class TestNetworkTopology extends TestCase {
   private final static NetworkTopology cluster = new NetworkTopology();
@@ -41,6 +51,8 @@ public class TestNetworkTopology extends
   private final static DatanodeDescriptor NODE = 
     new DatanodeDescriptor(new DatanodeID("h8:5020"), "/d2/r4");
   
+  private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class);
+  
   static {
     for(int i=0; i<dataNodes.length; i++) {
       cluster.add(dataNodes[i]);
@@ -70,7 +82,7 @@ public class TestNetworkTopology extends
     try {
       invalCluster.add(invalDataNodes[2]);
       fail("expected InvalidTopologyException");
-    } catch (NetworkTopology.InvalidTopologyException e) {
+    } catch (InvalidTopologyException e) {
       assertEquals(e.getMessage(), "Invalid network topology. " +
           "You cannot have a rack and a non-rack node at the same " +
           "level of the network topology.");
@@ -187,4 +199,66 @@ public class TestNetworkTopology extends
       }
     }
   }
+  
+  public void testInvalidNetworkTopologiesNotCachedInHdfs() throws Exception {
+    // start a cluster
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    try {
+      // bad rack topology
+      String racks[] = { "/a/b", "/c" };
+      String hosts[] = { "foo1.example.com", "foo2.example.com" };
+
+      cluster = new MiniDFSCluster(conf, 2, true, racks, hosts);
+      
+      cluster.waitActive();
+     
+      ClientProtocol client = DFSClient.createNamenode(conf);
+      
+      Assert.assertNotNull(client);
+      
+      // Wait for one DataNode to register.
+      // The other DataNode will not be able to register up because of the rack mismatch.
+      DatanodeInfo[] info;
+      while (true) {
+        info = client.getDatanodeReport(DatanodeReportType.LIVE);
+        Assert.assertFalse(info.length == 2);
+        if (info.length == 1) {
+          break;
+        }
+        Thread.sleep(1000);
+      }
+      // Set the network topology of the other node to the match the network
+      // topology of the node that came up.
+      int validIdx = info[0].getHostName().equals(hosts[0]) ? 0 : 1;
+      int invalidIdx = validIdx == 1 ? 0 : 1;
+      StaticMapping.addNodeToRack(hosts[invalidIdx], racks[validIdx]);
+      LOG.info("datanode " + validIdx + " came up with network location " + 
+        info[0].getNetworkLocation());
+
+      // Restart the DN with the invalid topology and wait for it to register.
+      cluster.restartDataNode(invalidIdx);
+      
+      Thread.sleep(5000);
+      while (true) {
+        info = client.getDatanodeReport(DatanodeReportType.LIVE);
+        if (info.length == 2) {
+          break;
+        }
+        if (info.length == 0) {
+          LOG.info("got no valid DNs");
+        } else if (info.length == 1) {
+          LOG.info("got one valid DN: " + info[0].getHostName() +
+              " (at " + info[0].getNetworkLocation() + ")");
+        }
+        Thread.sleep(1000);
+      }
+      Assert.assertEquals(info[0].getNetworkLocation(),
+                          info[1].getNetworkLocation());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }