You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/11/15 02:13:59 UTC

svn commit: r1201991 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/tools/ src/main/j...

Author: todd
Date: Tue Nov 15 01:13:58 2011
New Revision: 1201991

URL: http://svn.apache.org/viewvc?rev=1201991&view=rev
Log:
HDFS-2476. More CPU efficient data structure for under-replicated, over-replicated, and invalidated blocks. Contributed by Tomasz Nykiel.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Nov 15 01:13:58 2011
@@ -60,6 +60,10 @@ Trunk (unreleased changes)
     HDFS-2495. Increase granularity of write operations in ReplicationMonitor
     thus reducing contention for write lock. (Tomasz Nykiel via hairong)
 
+    HDFS-2476. More CPU efficient data structure for under-replicated,
+               over-replicated, and invalidated blocks.
+               (Tomasz Nykiel via todd)
+
   BUG FIXES
     HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Nov 15 01:13:58 2011
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
@@ -142,8 +143,8 @@ public class BlockManager {
   // eventually remove these extras.
   // Mapping: StorageID -> TreeSet<Block>
   //
-  public final Map<String, Collection<Block>> excessReplicateMap =
-    new TreeMap<String, Collection<Block>>();
+  public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
+    new TreeMap<String, LightWeightLinkedSet<Block>>();
 
   //
   // Store set of Blocks that need to be replicated 1 or more times.
@@ -1244,7 +1245,7 @@ public class BlockManager {
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
     while(it.hasNext()) {
       DatanodeDescriptor node = it.next();
-      Collection<Block> excessBlocks =
+      LightWeightLinkedSet<Block> excessBlocks =
         excessReplicateMap.get(node.getStorageID());
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
         corrupt++;
@@ -1899,7 +1900,7 @@ public class BlockManager {
     for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
          it.hasNext();) {
       DatanodeDescriptor cur = it.next();
-      Collection<Block> excessBlocks = excessReplicateMap.get(cur
+      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
           .getStorageID());
       if (excessBlocks == null || !excessBlocks.contains(block)) {
         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
@@ -2017,9 +2018,9 @@ public class BlockManager {
 
   private void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
-    Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
+    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
     if (excessBlocks == null) {
-      excessBlocks = new TreeSet<Block>();
+      excessBlocks = new LightWeightLinkedSet<Block>();
       excessReplicateMap.put(dn.getStorageID(), excessBlocks);
     }
     if (excessBlocks.add(block)) {
@@ -2067,7 +2068,7 @@ public class BlockManager {
       // We've removed a block from a node, so it's definitely no longer
       // in "excess" there.
       //
-      Collection<Block> excessBlocks = excessReplicateMap.get(node
+      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
           .getStorageID());
       if (excessBlocks != null) {
         if (excessBlocks.remove(block)) {
@@ -2217,8 +2218,8 @@ public class BlockManager {
       } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
         count++;
       } else {
-        Collection<Block> blocksExcess =
-          excessReplicateMap.get(node.getStorageID());
+        LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
+            .getStorageID());
         if (blocksExcess != null && blocksExcess.contains(b)) {
           excess++;
         } else {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Tue Nov 15 01:13:58 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.Deprecated
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -120,11 +121,11 @@ public class DatanodeDescriptor extends 
   private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
                                 new BlockQueue<BlockInfoUnderConstruction>();
   /** A set of blocks to be invalidated by this datanode */
-  private Set<Block> invalidateBlocks = new TreeSet<Block>();
+  private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
 
   /* Variables for maintaining number of blocks scheduled to be written to
    * this datanode. This count is approximate and might be slightly bigger
-   * in case of errors (e.g. datanode does not report if an error occurs 
+   * in case of errors (e.g. datanode does not report if an error occurs
    * while writing the block).
    */
   private int currApproxBlocksScheduled = 0;
@@ -400,45 +401,11 @@ public class DatanodeDescriptor extends 
    * Remove the specified number of blocks to be invalidated
    */
   public Block[] getInvalidateBlocks(int maxblocks) {
-    return getBlockArray(invalidateBlocks, maxblocks); 
-  }
-
-  static private Block[] getBlockArray(Collection<Block> blocks, int max) {
-    Block[] blockarray = null;
-    synchronized(blocks) {
-      int available = blocks.size();
-      int n = available;
-      if (max > 0 && n > 0) {
-        if (max < n) {
-          n = max;
-        }
-        // allocate the properly sized block array ... 
-        blockarray = new Block[n];
-
-        // iterate tree collecting n blocks... 
-        Iterator<Block> e = blocks.iterator();
-        int blockCount = 0;
-
-        while (blockCount < n && e.hasNext()) {
-          // insert into array ... 
-          blockarray[blockCount++] = e.next();
-
-          // remove from tree via iterator, if we are removing 
-          // less than total available blocks
-          if (n < available){
-            e.remove();
-          }
-        }
-        assert(blockarray.length == n);
-        
-        // now if the number of blocks removed equals available blocks,
-        // them remove all blocks in one fell swoop via clear
-        if (n == available) { 
-          blocks.clear();
-        }
-      }
+    synchronized (invalidateBlocks) {
+      Block[] deleteList = invalidateBlocks.pollToArray(new Block[Math.min(
+          invalidateBlocks.size(), maxblocks)]);
+      return deleteList.length == 0 ? null : deleteList;
     }
-    return blockarray;
   }
 
   /** Serialization for FSEditLog */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Tue Nov 15 01:13:58 2011
@@ -30,8 +30,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 
-/** 
+/**
  * Keeps a Collection for every named machine containing blocks
  * that have recently been invalidated and are thought to live
  * on the machine in question.
@@ -39,8 +40,8 @@ import org.apache.hadoop.hdfs.server.nam
 @InterfaceAudience.Private
 class InvalidateBlocks {
   /** Mapping: StorageID -> Collection of Blocks */
-  private final Map<String, Collection<Block>> node2blocks =
-      new TreeMap<String, Collection<Block>>();
+  private final Map<String, LightWeightHashSet<Block>> node2blocks =
+      new TreeMap<String, LightWeightHashSet<Block>>();
   /** The total number of blocks in the map. */
   private long numBlocks = 0L;
 
@@ -67,9 +68,9 @@ class InvalidateBlocks {
    */
   synchronized void add(final Block block, final DatanodeInfo datanode,
       final boolean log) {
-    Collection<Block> set = node2blocks.get(datanode.getStorageID());
+    LightWeightHashSet<Block> set = node2blocks.get(datanode.getStorageID());
     if (set == null) {
-      set = new HashSet<Block>();
+      set = new LightWeightHashSet<Block>();
       node2blocks.put(datanode.getStorageID(), set);
     }
     if (set.add(block)) {
@@ -83,7 +84,7 @@ class InvalidateBlocks {
 
   /** Remove a storage from the invalidatesSet */
   synchronized void remove(final String storageID) {
-    final Collection<Block> blocks = node2blocks.remove(storageID);
+    final LightWeightHashSet<Block> blocks = node2blocks.remove(storageID);
     if (blocks != null) {
       numBlocks -= blocks.size();
     }
@@ -91,7 +92,7 @@ class InvalidateBlocks {
 
   /** Remove the block from the specified storage. */
   synchronized void remove(final String storageID, final Block block) {
-    final Collection<Block> v = node2blocks.get(storageID);
+    final LightWeightHashSet<Block> v = node2blocks.get(storageID);
     if (v != null && v.remove(block)) {
       numBlocks--;
       if (v.isEmpty()) {
@@ -109,8 +110,8 @@ class InvalidateBlocks {
       return;
     }
 
-    for(Map.Entry<String,Collection<Block>> entry : node2blocks.entrySet()) {
-      final Collection<Block> blocks = entry.getValue();
+    for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
+      final LightWeightHashSet<Block> blocks = entry.getValue();
       if (blocks.size() > 0) {
         out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
       }
@@ -143,21 +144,17 @@ class InvalidateBlocks {
 
   private synchronized List<Block> invalidateWork(
       final String storageId, final DatanodeDescriptor dn) {
-    final Collection<Block> set = node2blocks.get(storageId);
+    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
     if (set == null) {
       return null;
     }
 
     // # blocks that can be sent in one message is limited
     final int limit = datanodeManager.blockInvalidateLimit;
-    final List<Block> toInvalidate = new ArrayList<Block>(limit);
-    final Iterator<Block> it = set.iterator();
-    for(int count = 0; count < limit && it.hasNext(); count++) {
-      toInvalidate.add(it.next());
-      it.remove();
-    }
+    final List<Block> toInvalidate = set.pollN(limit);
+
     // If we send everything in this message, remove this node entry
-    if (!it.hasNext()) {
+    if (set.isEmpty()) {
       remove(storageId);
     }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java Tue Nov 15 01:13:58 2011
@@ -24,6 +24,7 @@ import java.util.NavigableSet;
 import java.util.TreeSet;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
 /**
@@ -80,13 +81,13 @@ class UnderReplicatedBlocks implements I
   /** The queue for corrupt blocks: {@value} */
   static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
   /** the queues themselves */
-  private final List<NavigableSet<Block>> priorityQueues
-      = new ArrayList<NavigableSet<Block>>(LEVEL);
+  private List<LightWeightLinkedSet<Block>> priorityQueues
+      = new ArrayList<LightWeightLinkedSet<Block>>();
 
   /** Create an object. */
   UnderReplicatedBlocks() {
     for (int i = 0; i < LEVEL; i++) {
-      priorityQueues.add(new TreeSet<Block>());
+      priorityQueues.add(new LightWeightLinkedSet<Block>());
     }
   }
 
@@ -123,10 +124,10 @@ class UnderReplicatedBlocks implements I
   synchronized int getCorruptBlockSize() {
     return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
   }
-  
+
   /** Check if a block is in the neededReplication queue */
   synchronized boolean contains(Block block) {
-    for (NavigableSet<Block> set : priorityQueues) {
+    for(LightWeightLinkedSet<Block> set : priorityQueues) {
       if (set.contains(block)) {
         return true;
       }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Nov 15 01:13:58 2011
@@ -3997,7 +3997,7 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
-      String startBlockAfter) throws IOException {
+	String[] cookieTab) throws IOException {
 
     readLock();
     try {
@@ -4006,23 +4006,27 @@ public class FSNamesystem implements Nam
                               "replication queues have not been initialized.");
       }
       checkSuperuserPrivilege();
-      long startBlockId = 0;
       // print a limited # of corrupt files per call
       int count = 0;
       ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
-      
-      if (startBlockAfter != null) {
-        startBlockId = Block.filename2id(startBlockAfter);
-      }
 
       final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator();
+
+      if (cookieTab == null) {
+        cookieTab = new String[] { null };
+      }
+      int skip = getIntCookie(cookieTab[0]);
+      for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
+        blkIterator.next();
+      }
+
       while (blkIterator.hasNext()) {
         Block blk = blkIterator.next();
         INode inode = blockManager.getINode(blk);
+        skip++;
         if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
           String src = FSDirectory.getFullPathName(inode);
-          if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId))
-              && (src.startsWith(path))) {
+          if (src.startsWith(path)){
             corruptFiles.add(new CorruptFileBlockInfo(src, blk));
             count++;
             if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
@@ -4030,13 +4034,32 @@ public class FSNamesystem implements Nam
           }
         }
       }
+      cookieTab[0] = String.valueOf(skip);
       LOG.info("list corrupt file blocks returned: " + count);
       return corruptFiles;
     } finally {
       readUnlock();
     }
   }
-  
+
+  /**
+   * Convert string cookie to integer.
+   */
+  private static int getIntCookie(String cookie){
+    int c;
+    if(cookie == null){
+      c = 0;
+    } else {
+      try{
+        c = Integer.parseInt(cookie);
+      }catch (NumberFormatException e) {
+        c = 0;
+      }
+    }
+    c = Math.max(0, c);
+    return c;
+  }
+
   /**
    * Create delegation token secret manager
    */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Tue Nov 15 01:13:58 2011
@@ -698,17 +698,16 @@ class NameNodeRpcServer implements Namen
   @Override // ClientProtocol
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {
+	String[] cookieTab = new String[] { cookie };
     Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
-      namesystem.listCorruptFileBlocks(path, cookie);
-    
+      namesystem.listCorruptFileBlocks(path, cookieTab);
+
     String[] files = new String[fbs.size()];
-    String lastCookie = "";
     int i = 0;
     for(FSNamesystem.CorruptFileBlockInfo fb: fbs) {
       files[i++] = fb.path;
-      lastCookie = fb.block.getBlockName();
     }
-    return new CorruptFileBlocks(files, lastCookie);
+    return new CorruptFileBlocks(files, cookieTab[0]);
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Tue Nov 15 01:13:58 2011
@@ -114,11 +114,11 @@ public class NamenodeFsck {
   // We return back N files that are corrupt; the list of files returned is
   // ordered by block id; to allow continuation support, pass in the last block
   // # from previous call
-  private String startBlockAfter = null;
-  
+  private String[] currentCookie = new String[] { null };
+
   private final Configuration conf;
   private final PrintWriter out;
-  
+
   /**
    * Filesystem checker.
    * @param conf configuration (namenode config)
@@ -156,11 +156,11 @@ public class NamenodeFsck {
         this.showCorruptFileBlocks = true;
       }
       else if (key.equals("startblockafter")) {
-        this.startBlockAfter = pmap.get("startblockafter")[0]; 
+        this.currentCookie[0] = pmap.get("startblockafter")[0];
       }
     }
   }
-  
+
   /**
    * Check files on DFS, starting from the indicated path.
    */
@@ -216,19 +216,20 @@ public class NamenodeFsck {
       out.close();
     }
   }
- 
+
   private void listCorruptFileBlocks() throws IOException {
     Collection<FSNamesystem.CorruptFileBlockInfo> corruptFiles = namenode.
-      getNamesystem().listCorruptFileBlocks(path, startBlockAfter);
+      getNamesystem().listCorruptFileBlocks(path, currentCookie);
     int numCorruptFiles = corruptFiles.size();
     String filler;
     if (numCorruptFiles > 0) {
       filler = Integer.toString(numCorruptFiles);
-    } else if (startBlockAfter == null) {
+    } else if (currentCookie[0].equals("0")) {
       filler = "no";
     } else {
       filler = "no more";
     }
+    out.println("Cookie:\t" + currentCookie[0]);
     for (FSNamesystem.CorruptFileBlockInfo c : corruptFiles) {
       out.println(c.toString());
     }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java Tue Nov 15 01:13:58 2011
@@ -145,14 +145,15 @@ public class DFSck extends Configured im
       throws IOException {
     int errCode = -1;
     int numCorrupt = 0;
-    String lastBlock = null;
+    int cookie = 0;
     final String noCorruptLine = "has no CORRUPT files";
     final String noMoreCorruptLine = "has no more CORRUPT files";
+    final String cookiePrefix = "Cookie:";
     boolean allDone = false;
     while (!allDone) {
       final StringBuffer url = new StringBuffer(baseUrl);
-      if (lastBlock != null) {
-        url.append("&startblockafter=").append(lastBlock);
+      if (cookie > 0) {
+        url.append("&startblockafter=").append(String.valueOf(cookie));
       }
       URL path = new URL(url.toString());
       SecurityUtil.fetchServiceTicket(path);
@@ -163,29 +164,31 @@ public class DFSck extends Configured im
       try {
         String line = null;
         while ((line = input.readLine()) != null) {
-          if ((line.endsWith(noCorruptLine)) || 
+          if (line.startsWith(cookiePrefix)){
+            try{
+              cookie = Integer.parseInt(line.split("\t")[1]);
+            } catch (Exception e){
+              allDone = true;
+              break;
+            }
+            continue;
+          }
+          if ((line.endsWith(noCorruptLine)) ||
               (line.endsWith(noMoreCorruptLine)) ||
               (line.endsWith(NamenodeFsck.NONEXISTENT_STATUS))) {
             allDone = true;
             break;
           }
           if ((line.isEmpty())
-              || (line.startsWith("FSCK started by")) 
+              || (line.startsWith("FSCK started by"))
               || (line.startsWith("The filesystem under path")))
             continue;
           numCorrupt++;
           if (numCorrupt == 1) {
-            out.println("The list of corrupt files under path '" 
+            out.println("The list of corrupt files under path '"
                 + dir + "' are:");
           }
           out.println(line);
-          try {
-            // Get the block # that we need to send in next call
-            lastBlock = line.split("\t")[0];
-          } catch (Exception e) {
-            allDone = true;
-            break;
-          }
         }
       } finally {
         input.close();

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java?rev=1201991&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java Tue Nov 15 01:13:58 2011
@@ -0,0 +1,618 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A low memory linked hash set implementation, which uses an array for storing
+ * the elements and linked lists for collision resolution. This class does not
+ * support null element.
+ *
+ * This class is not thread safe.
+ *
+ */
+public class LightWeightHashSet<T> implements Collection<T> {
+  /**
+   * Elements of {@link LightWeightLinkedSet}.
+   */
+  static class LinkedElement<T> {
+    protected final T element;
+
+    // reference to the next entry within a bucket linked list
+    protected LinkedElement<T> next;
+
+    //hashCode of the element
+    protected final int hashCode;
+
+    public LinkedElement(T elem, int hash) {
+      this.element = elem;
+      this.next = null;
+      this.hashCode = hash;
+    }
+
+    public String toString() {
+      return element.toString();
+    }
+  }
+
+  protected static final float DEFAULT_MAX_LOAD_FACTOR = 0.75f;
+  protected static final float DEFAUT_MIN_LOAD_FACTOR = 0.2f;
+  protected static final int MINIMUM_CAPACITY = 16;
+
+  static final int MAXIMUM_CAPACITY = 1 << 30;
+  private static final Log LOG = LogFactory.getLog(LightWeightHashSet.class);
+
+  /**
+   * An internal array of entries, which are the rows of the hash table. The
+   * size must be a power of two.
+   */
+  protected LinkedElement<T>[] entries;
+  /** Size of the entry table. */
+  private int capacity;
+  /** The size of the set (not the entry array). */
+  protected int size = 0;
+  /** Hashmask used for determining the bucket index **/
+  private int hash_mask;
+  /** Capacity at initialization time **/
+  private final int initialCapacity;
+
+  /**
+   * Modification version for fail-fast.
+   *
+   * @see ConcurrentModificationException
+   */
+  protected volatile int modification = 0;
+
+  private float maxLoadFactor;
+  private float minLoadFactor;
+  private int expandMultiplier = 2;
+
+  private int expandThreshold;
+  private int shrinkThreshold;
+
+  /**
+   * @param initCapacity
+   *          Recommended size of the internal array.
+   * @param maxLoadFactor
+   *          used to determine when to expand the internal array
+   * @param minLoadFactor
+   *          used to determine when to shrink the internal array
+   */
+  @SuppressWarnings("unchecked")
+  public LightWeightHashSet(int initCapacity, float maxLoadFactor,
+      float minLoadFactor) {
+
+    if (maxLoadFactor <= 0 || maxLoadFactor > 1.0f)
+      throw new IllegalArgumentException("Illegal maxload factor: "
+          + maxLoadFactor);
+
+    if (minLoadFactor <= 0 || minLoadFactor > maxLoadFactor)
+      throw new IllegalArgumentException("Illegal minload factor: "
+          + minLoadFactor);
+
+    this.initialCapacity = computeCapacity(initCapacity);
+    this.capacity = this.initialCapacity;
+    this.hash_mask = capacity - 1;
+
+    this.maxLoadFactor = maxLoadFactor;
+    this.expandThreshold = (int) (capacity * maxLoadFactor);
+    this.minLoadFactor = minLoadFactor;
+    this.shrinkThreshold = (int) (capacity * minLoadFactor);
+
+    entries = new LinkedElement[capacity];
+    LOG.debug("initial capacity=" + initialCapacity + ", max load factor= "
+        + maxLoadFactor + ", min load factor= " + minLoadFactor);
+  }
+
+  public LightWeightHashSet() {
+    this(MINIMUM_CAPACITY, DEFAULT_MAX_LOAD_FACTOR, DEFAUT_MIN_LOAD_FACTOR);
+  }
+
+  public LightWeightHashSet(int minCapacity) {
+    this(minCapacity, DEFAULT_MAX_LOAD_FACTOR, DEFAUT_MIN_LOAD_FACTOR);
+  }
+
+  /**
+   * Check if the set is empty.
+   *
+   * @return true is set empty, false otherwise
+   */
+  public boolean isEmpty() {
+    return size == 0;
+  }
+
+  /**
+   * Return the current capacity (for testing).
+   */
+  public int getCapacity() {
+    return capacity;
+  }
+
+  /**
+   * Return the number of stored elements.
+   */
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Get index in the internal table for a given hash.
+   */
+  protected int getIndex(int hashCode) {
+    return hashCode & hash_mask;
+  }
+
+  /**
+   * Check if the set contains given element
+   *
+   * @return true if element present, false otherwise.
+   */
+  @SuppressWarnings("unchecked")
+  public boolean contains(final Object key) {
+    // validate key
+    if (key == null) {
+      throw new IllegalArgumentException("Null element is not supported.");
+    }
+    // find element
+    final int hashCode = ((T)key).hashCode();
+    final int index = getIndex(hashCode);
+    return containsElem(index, (T) key, hashCode);
+  }
+
+  /**
+   * Check if the set contains given element at given index.
+   *
+   * @return true if element present, false otherwise.
+   */
+  protected boolean containsElem(int index, final T key, int hashCode) {
+    for (LinkedElement<T> e = entries[index]; e != null; e = e.next) {
+      // element found
+      if (hashCode == e.hashCode && e.element.equals(key)) {
+        return true;
+      }
+    }
+    // element not found
+    return false;
+  }
+
+  /**
+   * All all elements in the collection. Expand if necessary.
+   *
+   * @param toAdd - elements to add.
+   * @return true if the set has changed, false otherwise
+   */
+  public boolean addAll(Collection<? extends T> toAdd) {
+    boolean changed = false;
+    for (T elem : toAdd) {
+      changed |= addElem(elem);
+    }
+    expandIfNecessary();
+    return changed;
+  }
+
+  /**
+   * Add given element to the hash table. Expand table if necessary.
+   *
+   * @return true if the element was not present in the table, false otherwise
+   */
+  public boolean add(final T element) {
+    boolean added = addElem(element);
+    expandIfNecessary();
+    return added;
+  }
+
+  /**
+   * Add given element to the hash table
+   *
+   * @return true if the element was not present in the table, false otherwise
+   */
+  protected boolean addElem(final T element) {
+    // validate element
+    if (element == null) {
+      throw new IllegalArgumentException("Null element is not supported.");
+    }
+    // find hashCode & index
+    final int hashCode = element.hashCode();
+    final int index = getIndex(hashCode);
+    // return false if already present
+    if (containsElem(index, element, hashCode)) {
+      return false;
+    }
+
+    modification++;
+    size++;
+
+    // update bucket linked list
+    LinkedElement<T> le = new LinkedElement<T>(element, hashCode);
+    le.next = entries[index];
+    entries[index] = le;
+    return true;
+  }
+
+  /**
+   * Remove the element corresponding to the key.
+   *
+   * @return If such element exists, return true. Otherwise, return false.
+   */
+  @SuppressWarnings("unchecked")
+  public boolean remove(final Object key) {
+    // validate key
+    if (key == null) {
+      throw new IllegalArgumentException("Null element is not supported.");
+    }
+    LinkedElement<T> removed = removeElem((T) key);
+    shrinkIfNecessary();
+    return removed == null ? false : true;
+  }
+
+  /**
+   * Remove the element corresponding to the key, given key.hashCode() == index.
+   *
+   * @return If such element exists, return true. Otherwise, return false.
+   */
+  protected LinkedElement<T> removeElem(final T key) {
+    LinkedElement<T> found = null;
+    final int hashCode = key.hashCode();
+    final int index = getIndex(hashCode);
+    if (entries[index] == null) {
+      return null;
+    } else if (hashCode == entries[index].hashCode &&
+            entries[index].element.equals(key)) {
+      // remove the head of the bucket linked list
+      modification++;
+      size--;
+      found = entries[index];
+      entries[index] = found.next;
+    } else {
+      // head != null and key is not equal to head
+      // search the element
+      LinkedElement<T> prev = entries[index];
+      for (found = prev.next; found != null;) {
+        if (hashCode == found.hashCode &&
+                found.element.equals(key)) {
+          // found the element, remove it
+          modification++;
+          size--;
+          prev.next = found.next;
+          found.next = null;
+          break;
+        } else {
+          prev = found;
+          found = found.next;
+        }
+      }
+    }
+    return found;
+  }
+
+  /**
+   * Remove and return n elements from the hashtable.
+   * The order in which entries are removed is unspecified, and
+   * and may not correspond to the order in which they were inserted.
+   *
+   * @return first element
+   */
+  public List<T> pollN(int n) {
+    if (n >= size) {
+      return pollAll();
+    }
+    List<T> retList = new ArrayList<T>(n);
+    if (n == 0) {
+      return retList;
+    }
+    boolean done = false;
+    int currentBucketIndex = 0;
+
+    while (!done) {
+      LinkedElement<T> current = entries[currentBucketIndex];
+      while (current != null) {
+        retList.add(current.element);
+        current = current.next;
+        entries[currentBucketIndex] = current;
+        size--;
+        modification++;
+        if (--n == 0) {
+          done = true;
+          break;
+        }
+      }
+      currentBucketIndex++;
+    }
+    shrinkIfNecessary();
+    return retList;
+  }
+
+  /**
+   * Remove all elements from the set and return them. Clear the entries.
+   */
+  public List<T> pollAll() {
+    List<T> retList = new ArrayList<T>(size);
+    for (int i = 0; i < entries.length; i++) {
+      LinkedElement<T> current = entries[i];
+      while (current != null) {
+        retList.add(current.element);
+        current = current.next;
+      }
+    }
+    this.clear();
+    return retList;
+  }
+
+  /**
+   * Get array.length elements from the set, and put them into the array.
+   */
+  @SuppressWarnings("unchecked")
+  public T[] pollToArray(T[] array) {
+    int currentIndex = 0;
+    LinkedElement<T> current = null;
+
+    if (array.length == 0) {
+      return array;
+    }
+    if (array.length > size) {
+      array = (T[]) java.lang.reflect.Array.newInstance(array.getClass()
+          .getComponentType(), size);
+    }
+    // do fast polling if the entire set needs to be fetched
+    if (array.length == size) {
+      for (int i = 0; i < entries.length; i++) {
+        current = entries[i];
+        while (current != null) {
+          array[currentIndex++] = current.element;
+          current = current.next;
+        }
+      }
+      this.clear();
+      return array;
+    }
+
+    boolean done = false;
+    int currentBucketIndex = 0;
+
+    while (!done) {
+      current = entries[currentBucketIndex];
+      while (current != null) {
+        array[currentIndex++] = current.element;
+        current = current.next;
+        entries[currentBucketIndex] = current;
+        size--;
+        modification++;
+        if (currentIndex == array.length) {
+          done = true;
+          break;
+        }
+      }
+      currentBucketIndex++;
+    }
+    shrinkIfNecessary();
+    return array;
+  }
+
+  /**
+   * Compute capacity given initial capacity.
+   *
+   * @return final capacity, either MIN_CAPACITY, MAX_CAPACITY, or power of 2
+   *         closest to the requested capacity.
+   */
+  private int computeCapacity(int initial) {
+    if (initial < MINIMUM_CAPACITY) {
+      return MINIMUM_CAPACITY;
+    }
+    if (initial > MAXIMUM_CAPACITY) {
+      return MAXIMUM_CAPACITY;
+    }
+    int capacity = 1;
+    while (capacity < initial) {
+      capacity <<= 1;
+    }
+    return capacity;
+  }
+
+  /**
+   * Resize the internal table to given capacity.
+   */
+  @SuppressWarnings("unchecked")
+  private void resize(int cap) {
+    int newCapacity = computeCapacity(cap);
+    if (newCapacity == this.capacity) {
+      return;
+    }
+    this.capacity = newCapacity;
+    this.expandThreshold = (int) (capacity * maxLoadFactor);
+    this.shrinkThreshold = (int) (capacity * minLoadFactor);
+    this.hash_mask = capacity - 1;
+    LinkedElement<T>[] temp = entries;
+    entries = new LinkedElement[capacity];
+    for (int i = 0; i < temp.length; i++) {
+      LinkedElement<T> curr = temp[i];
+      while (curr != null) {
+        LinkedElement<T> next = curr.next;
+        int index = getIndex(curr.hashCode);
+        curr.next = entries[index];
+        entries[index] = curr;
+        curr = next;
+      }
+    }
+  }
+
+  /**
+   * Checks if we need to shrink, and shrinks if necessary.
+   */
+  protected void shrinkIfNecessary() {
+    if (size < this.shrinkThreshold && capacity > initialCapacity) {
+      resize(capacity / expandMultiplier);
+    }
+  }
+
+  /**
+   * Checks if we need to expand, and expands if necessary.
+   */
+  protected void expandIfNecessary() {
+    if (size > this.expandThreshold && capacity < MAXIMUM_CAPACITY) {
+      resize(capacity * expandMultiplier);
+    }
+  }
+
+  public Iterator<T> iterator() {
+    return new LinkedSetIterator();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName());
+    b.append("(size=").append(size).append(", modification=")
+        .append(modification).append(", entries.length=")
+        .append(entries.length).append(")");
+    return b.toString();
+  }
+
+  /** Print detailed information of this object. */
+  public void printDetails(final PrintStream out) {
+    out.print(this + ", entries = [");
+    for (int i = 0; i < entries.length; i++) {
+      if (entries[i] != null) {
+        LinkedElement<T> e = entries[i];
+        out.print("\n  " + i + ": " + e);
+        for (e = e.next; e != null; e = e.next) {
+          out.print(" -> " + e);
+        }
+      }
+    }
+    out.println("\n]");
+  }
+
+  private class LinkedSetIterator implements Iterator<T> {
+    /** The starting modification for fail-fast. */
+    private final int startModification = modification;
+    /** The current index of the entry array. */
+    private int index = -1;
+    /** The next element to return. */
+    private LinkedElement<T> next = nextNonemptyEntry();
+
+    private LinkedElement<T> nextNonemptyEntry() {
+      for (index++; index < entries.length && entries[index] == null; index++);
+      return index < entries.length ? entries[index] : null;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return next != null;
+    }
+
+    @Override
+    public T next() {
+      if (modification != startModification) {
+        throw new ConcurrentModificationException("modification="
+            + modification + " != startModification = " + startModification);
+      }
+      if (next == null) {
+        throw new NoSuchElementException();
+      }
+      final T e = next.element;
+      // find the next element
+      final LinkedElement<T> n = next.next;
+      next = n != null ? n : nextNonemptyEntry();
+      return e;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove is not supported.");
+    }
+  }
+
+  /**
+   * Clear the set. Resize it to the original capacity.
+   */
+  @SuppressWarnings("unchecked")
+  public void clear() {
+    this.capacity = this.initialCapacity;
+    this.hash_mask = capacity - 1;
+
+    this.expandThreshold = (int) (capacity * maxLoadFactor);
+    this.shrinkThreshold = (int) (capacity * minLoadFactor);
+
+    entries = new LinkedElement[capacity];
+    size = 0;
+    modification++;
+  }
+
+  @Override
+  public Object[] toArray() {
+    Object[] result = new Object[size];
+    return toArray(result);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <U> U[] toArray(U[] a) {
+    if (a == null) {
+      throw new NullPointerException("Input array can not be null");
+    }
+    if (a.length < size) {
+      a = (U[]) java.lang.reflect.Array.newInstance(a.getClass()
+          .getComponentType(), size);
+    }
+    int currentIndex = 0;
+    for (int i = 0; i < entries.length; i++) {
+      LinkedElement<T> current = entries[i];
+      while (current != null) {
+        a[currentIndex++] = (U) current.element;
+        current = current.next;
+      }
+    }
+    return a;
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> c) {
+    Iterator<?> iter = c.iterator();
+    while (iter.hasNext()) {
+      if (!contains(iter.next())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c) {
+    boolean changed = false;
+    Iterator<?> iter = c.iterator();
+    while (iter.hasNext()) {
+      changed |= remove(iter.next());
+    }
+    return changed;
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c) {
+    throw new UnsupportedOperationException("retainAll is not supported.");
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java?rev=1201991&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java Tue Nov 15 01:13:58 2011
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * A low memory linked hash set implementation, which uses an array for storing
+ * the elements and linked lists for collision resolution. In addition it stores
+ * elements in a linked list to ensure ordered traversal. This class does not
+ * support null element.
+ *
+ * This class is not thread safe.
+ *
+ */
+public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
+  /**
+   * Elements of {@link LightWeightLinkedSet}.
+   */
+  static class DoubleLinkedElement<T> extends LinkedElement<T> {
+    // references to elements within all-element linked list
+    private DoubleLinkedElement<T> before;
+    private DoubleLinkedElement<T> after;
+
+    public DoubleLinkedElement(T elem, int hashCode) {
+      super(elem, hashCode);
+      this.before = null;
+      this.after = null;
+    }
+
+    public String toString() {
+      return super.toString();
+    }
+  }
+
+  private DoubleLinkedElement<T> head;
+  private DoubleLinkedElement<T> tail;
+
+  /**
+   * @param initCapacity
+   *          Recommended size of the internal array.
+   * @param maxLoadFactor
+   *          used to determine when to expand the internal array
+   * @param minLoadFactor
+   *          used to determine when to shrink the internal array
+   */
+  public LightWeightLinkedSet(int initCapacity, float maxLoadFactor,
+      float minLoadFactor) {
+    super(initCapacity, maxLoadFactor, minLoadFactor);
+    head = null;
+    tail = null;
+  }
+
+  public LightWeightLinkedSet() {
+    this(MINIMUM_CAPACITY, DEFAULT_MAX_LOAD_FACTOR, DEFAUT_MIN_LOAD_FACTOR);
+  }
+
+  /**
+   * Add given element to the hash table
+   *
+   * @return true if the element was not present in the table, false otherwise
+   */
+  protected boolean addElem(final T element) {
+    // validate element
+    if (element == null) {
+      throw new IllegalArgumentException("Null element is not supported.");
+    }
+    // find hashCode & index
+    final int hashCode = element.hashCode();
+    final int index = getIndex(hashCode);
+    // return false if already present
+    if (containsElem(index, element, hashCode)) {
+      return false;
+    }
+
+    modification++;
+    size++;
+
+    // update bucket linked list
+    DoubleLinkedElement<T> le = new DoubleLinkedElement<T>(element, hashCode);
+    le.next = entries[index];
+    entries[index] = le;
+
+    // insert to the end of the all-element linked list
+    le.after = null;
+    le.before = tail;
+    if (tail != null) {
+      tail.after = le;
+    }
+    tail = le;
+    if (head == null) {
+      head = le;
+    }
+    return true;
+  }
+
+  /**
+   * Remove the element corresponding to the key, given key.hashCode() == index.
+   *
+   * @return Return the entry with the element if exists. Otherwise return null.
+   */
+  protected DoubleLinkedElement<T> removeElem(final T key) {
+    DoubleLinkedElement<T> found = (DoubleLinkedElement<T>) (super
+        .removeElem(key));
+    if (found == null) {
+      return null;
+    }
+
+    // update linked list
+    if (found.after != null) {
+      found.after.before = found.before;
+    }
+    if (found.before != null) {
+      found.before.after = found.after;
+    }
+    if (head == found) {
+      head = head.after;
+    }
+    if (tail == found) {
+      tail = tail.before;
+    }
+    return found;
+  }
+
+  /**
+   * Remove and return first element on the linked list of all elements.
+   *
+   * @return first element
+   */
+  public T pollFirst() {
+    if (head == null) {
+      return null;
+    }
+    T first = head.element;
+    this.remove(first);
+    return first;
+  }
+
+  /**
+   * Remove and return n elements from the hashtable.
+   * The order in which entries are removed is corresponds 
+   * to the order in which they were inserted.
+   *
+   * @return first element
+   */
+  public List<T> pollN(int n) {
+    if (n >= size) {
+      // if we need to remove all elements then do fast polling
+      return pollAll();
+    }
+    List<T> retList = new ArrayList<T>(n);
+    while (n-- > 0 && head != null) {
+      T curr = head.element;
+      this.removeElem(curr);
+      retList.add(curr);
+    }
+    shrinkIfNecessary();
+    return retList;
+  }
+
+  /**
+   * Remove all elements from the set and return them in order. Traverse the
+   * link list, don't worry about hashtable - faster version of the parent
+   * method.
+   */
+  public List<T> pollAll() {
+    List<T> retList = new ArrayList<T>(size);
+    while (head != null) {
+      retList.add(head.element);
+      head = head.after;
+    }
+    this.clear();
+    return retList;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <U> U[] toArray(U[] a) {
+    if (a == null) {
+      throw new NullPointerException("Input array can not be null");
+    }
+    if (a.length < size) {
+      a = (U[]) java.lang.reflect.Array.newInstance(a.getClass()
+          .getComponentType(), size);
+    }
+    int currentIndex = 0;
+    DoubleLinkedElement<T> current = head;
+    while (current != null) {
+      T curr = current.element;
+      a[currentIndex++] = (U) curr;
+      current = current.after;
+    }
+    return a;
+  }
+
+  public Iterator<T> iterator() {
+    return new LinkedSetIterator();
+  }
+
+  private class LinkedSetIterator implements Iterator<T> {
+    /** The starting modification for fail-fast. */
+    private final int startModification = modification;
+    /** The next element to return. */
+    private DoubleLinkedElement<T> next = head;
+
+    @Override
+    public boolean hasNext() {
+      return next != null;
+    }
+
+    @Override
+    public T next() {
+      if (modification != startModification) {
+        throw new ConcurrentModificationException("modification="
+            + modification + " != startModification = " + startModification);
+      }
+      if (next == null) {
+        throw new NoSuchElementException();
+      }
+      final T e = next.element;
+      // find the next element
+      next = next.after;
+      return e;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove is not supported.");
+    }
+  }
+
+  /**
+   * Clear the set. Resize it to the original capacity.
+   */
+  public void clear() {
+    super.clear();
+    this.head = null;
+    this.tail = null;
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java?rev=1201991&r1=1201990&r2=1201991&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java Tue Nov 15 01:13:58 2011
@@ -323,9 +323,10 @@ public class TestListCorruptFileBlocks {
       FSNamesystem.CorruptFileBlockInfo[] cfb = corruptFileBlocks
           .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
       // now get the 2nd and 3rd file that is corrupt
+      String[] cookie = new String[]{"1"};
       Collection<FSNamesystem.CorruptFileBlockInfo> nextCorruptFileBlocks =
         namenode.getNamesystem()
-          .listCorruptFileBlocks("/corruptData", cfb[0].block.getBlockName());
+          .listCorruptFileBlocks("/corruptData", cookie);
       FSNamesystem.CorruptFileBlockInfo[] ncfb = nextCorruptFileBlocks
           .toArray(new FSNamesystem.CorruptFileBlockInfo[0]);
       numCorrupt = nextCorruptFileBlocks.size();
@@ -333,9 +334,9 @@ public class TestListCorruptFileBlocks {
       assertTrue(ncfb[0].block.getBlockName()
           .equalsIgnoreCase(cfb[1].block.getBlockName()));
 
-      corruptFileBlocks = 
-        namenode.getNamesystem().listCorruptFileBlocks("/corruptData",
-          ncfb[1].block.getBlockName());
+      corruptFileBlocks =
+        namenode.getNamesystem()
+          .listCorruptFileBlocks("/corruptData", cookie);
       numCorrupt = corruptFileBlocks.size();
       assertTrue(numCorrupt == 0);
       // Do a listing on a dir which doesn't have any corrupt blocks and

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java?rev=1201991&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java Tue Nov 15 01:13:58 2011
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+import org.junit.Before;
+import static org.junit.Assert.*;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestLightWeightHashSet{
+
+  private static final Log LOG = LogFactory
+      .getLog("org.apache.hadoop.hdfs.TestLightWeightHashSet");
+  private ArrayList<Integer> list = new ArrayList<Integer>();
+  private final int NUM = 100;
+  private LightWeightHashSet<Integer> set;
+  private Random rand;
+
+  @Before
+  public void setUp() {
+    float maxF = LightWeightHashSet.DEFAULT_MAX_LOAD_FACTOR;
+    float minF = LightWeightHashSet.DEFAUT_MIN_LOAD_FACTOR;
+    int initCapacity = LightWeightHashSet.MINIMUM_CAPACITY;
+    rand = new Random(System.currentTimeMillis());
+    list.clear();
+    for (int i = 0; i < NUM; i++) {
+      list.add(rand.nextInt());
+    }
+    set = new LightWeightHashSet<Integer>(initCapacity, maxF, minF);
+  }
+
+  @Test
+  public void testEmptyBasic() {
+    LOG.info("Test empty basic");
+    Iterator<Integer> iter = set.iterator();
+    // iterator should not have next
+    assertFalse(iter.hasNext());
+    assertEquals(0, set.size());
+    assertTrue(set.isEmpty());
+    LOG.info("Test empty - DONE");
+  }
+
+  @Test
+  public void testOneElementBasic() {
+    LOG.info("Test one element basic");
+    set.add(list.get(0));
+    // set should be non-empty
+    assertEquals(1, set.size());
+    assertFalse(set.isEmpty());
+
+    // iterator should have next
+    Iterator<Integer> iter = set.iterator();
+    assertTrue(iter.hasNext());
+
+    // iterator should not have next
+    assertEquals(list.get(0), iter.next());
+    assertFalse(iter.hasNext());
+    LOG.info("Test one element basic - DONE");
+  }
+
+  @Test
+  public void testMultiBasic() {
+    LOG.info("Test multi element basic");
+    // add once
+    for (Integer i : list) {
+      assertTrue(set.add(i));
+    }
+    assertEquals(list.size(), set.size());
+
+    // check if the elements are in the set
+    for (Integer i : list) {
+      assertTrue(set.contains(i));
+    }
+
+    // add again - should return false each time
+    for (Integer i : list) {
+      assertFalse(set.add(i));
+    }
+
+    // check again if the elements are there
+    for (Integer i : list) {
+      assertTrue(set.contains(i));
+    }
+
+    Iterator<Integer> iter = set.iterator();
+    int num = 0;
+    while (iter.hasNext()) {
+      Integer next = iter.next();
+      assertNotNull(next);
+      assertTrue(list.contains(next));
+      num++;
+    }
+    // check the number of element from the iterator
+    assertEquals(list.size(), num);
+    LOG.info("Test multi element basic - DONE");
+  }
+
+  @Test
+  public void testRemoveOne() {
+    LOG.info("Test remove one");
+    assertTrue(set.add(list.get(0)));
+    assertEquals(1, set.size());
+
+    // remove from the head/tail
+    assertTrue(set.remove(list.get(0)));
+    assertEquals(0, set.size());
+
+    // check the iterator
+    Iterator<Integer> iter = set.iterator();
+    assertFalse(iter.hasNext());
+
+    // add the element back to the set
+    assertTrue(set.add(list.get(0)));
+    assertEquals(1, set.size());
+
+    iter = set.iterator();
+    assertTrue(iter.hasNext());
+    LOG.info("Test remove one - DONE");
+  }
+
+  @Test
+  public void testRemoveMulti() {
+    LOG.info("Test remove multi");
+    for (Integer i : list) {
+      assertTrue(set.add(i));
+    }
+    for (int i = 0; i < NUM / 2; i++) {
+      assertTrue(set.remove(list.get(i)));
+    }
+
+    // the deleted elements should not be there
+    for (int i = 0; i < NUM / 2; i++) {
+      assertFalse(set.contains(list.get(i)));
+    }
+
+    // the rest should be there
+    for (int i = NUM / 2; i < NUM; i++) {
+      assertTrue(set.contains(list.get(i)));
+    }
+    LOG.info("Test remove multi - DONE");
+  }
+
+  @Test
+  public void testRemoveAll() {
+    LOG.info("Test remove all");
+    for (Integer i : list) {
+      assertTrue(set.add(i));
+    }
+    for (int i = 0; i < NUM; i++) {
+      assertTrue(set.remove(list.get(i)));
+    }
+    // the deleted elements should not be there
+    for (int i = 0; i < NUM; i++) {
+      assertFalse(set.contains(list.get(i)));
+    }
+
+    // iterator should not have next
+    Iterator<Integer> iter = set.iterator();
+    assertFalse(iter.hasNext());
+    assertTrue(set.isEmpty());
+    LOG.info("Test remove all - DONE");
+  }
+
+  @Test
+  public void testPollAll() {
+    LOG.info("Test poll all");
+    for (Integer i : list) {
+      assertTrue(set.add(i));
+    }
+    // remove all elements by polling
+    List<Integer> poll = set.pollAll();
+    assertEquals(0, set.size());
+    assertTrue(set.isEmpty());
+
+    // the deleted elements should not be there
+    for (int i = 0; i < NUM; i++) {
+      assertFalse(set.contains(list.get(i)));
+    }
+
+    // we should get all original items
+    for (Integer i : poll) {
+      assertTrue(list.contains(i));
+    }
+
+    Iterator<Integer> iter = set.iterator();
+    assertFalse(iter.hasNext());
+    LOG.info("Test poll all - DONE");
+  }
+
+  @Test
+  public void testPollNMulti() {
+    LOG.info("Test pollN multi");
+
+    // use addAll
+    set.addAll(list);
+
+    // poll zero
+    List<Integer> poll = set.pollN(0);
+    assertEquals(0, poll.size());
+    for (Integer i : list) {
+      assertTrue(set.contains(i));
+    }
+
+    // poll existing elements (less than size)
+    poll = set.pollN(10);
+    assertEquals(10, poll.size());
+
+    for (Integer i : poll) {
+      // should be in original items
+      assertTrue(list.contains(i));
+      // should not be in the set anymore
+      assertFalse(set.contains(i));
+    }
+
+    // poll more elements than present
+    poll = set.pollN(1000);
+    assertEquals(NUM - 10, poll.size());
+
+    for (Integer i : poll) {
+      // should be in original items
+      assertTrue(list.contains(i));
+    }
+
+    // set is empty
+    assertTrue(set.isEmpty());
+    assertEquals(0, set.size());
+
+    LOG.info("Test pollN multi - DONE");
+  }
+
+  @Test
+  public void testPollNMultiArray() {
+    LOG.info("Test pollN multi array");
+
+    // use addAll
+    set.addAll(list);
+
+    // poll existing elements (less than size)
+    Integer[] poll = new Integer[10];
+    poll = set.pollToArray(poll);
+    assertEquals(10, poll.length);
+
+    for (Integer i : poll) {
+      // should be in original items
+      assertTrue(list.contains(i));
+      // should not be in the set anymore
+      assertFalse(set.contains(i));
+    }
+
+    // poll other elements (more than size)
+    poll = new Integer[NUM];
+    poll = set.pollToArray(poll);
+    assertEquals(NUM - 10, poll.length);
+
+    for (int i = 0; i < NUM - 10; i++) {
+      assertTrue(list.contains(poll[i]));
+    }
+
+    // set is empty
+    assertTrue(set.isEmpty());
+    assertEquals(0, set.size());
+
+    // //////
+    set.addAll(list);
+    // poll existing elements (exactly the size)
+    poll = new Integer[NUM];
+    poll = set.pollToArray(poll);
+    assertTrue(set.isEmpty());
+    assertEquals(0, set.size());
+    assertEquals(NUM, poll.length);
+    for (int i = 0; i < NUM; i++) {
+      assertTrue(list.contains(poll[i]));
+    }
+    // //////
+
+    // //////
+    set.addAll(list);
+    // poll existing elements (exactly the size)
+    poll = new Integer[0];
+    poll = set.pollToArray(poll);
+    for (int i = 0; i < NUM; i++) {
+      assertTrue(set.contains(list.get(i)));
+    }
+    assertEquals(0, poll.length);
+    // //////
+
+    LOG.info("Test pollN multi array- DONE");
+  }
+
+  @Test
+  public void testClear() {
+    LOG.info("Test clear");
+    // use addAll
+    set.addAll(list);
+    assertEquals(NUM, set.size());
+    assertFalse(set.isEmpty());
+
+    // clear the set
+    set.clear();
+    assertEquals(0, set.size());
+    assertTrue(set.isEmpty());
+
+    // iterator should be empty
+    Iterator<Integer> iter = set.iterator();
+    assertFalse(iter.hasNext());
+
+    LOG.info("Test clear - DONE");
+  }
+
+  @Test
+  public void testCapacity() {
+    LOG.info("Test capacity");
+    float maxF = LightWeightHashSet.DEFAULT_MAX_LOAD_FACTOR;
+    float minF = LightWeightHashSet.DEFAUT_MIN_LOAD_FACTOR;
+
+    // capacity lower than min_capacity
+    set = new LightWeightHashSet<Integer>(1, maxF, minF);
+    assertEquals(LightWeightHashSet.MINIMUM_CAPACITY, set.getCapacity());
+
+    // capacity not a power of two
+    set = new LightWeightHashSet<Integer>(30, maxF, minF);
+    assertEquals(Math.max(LightWeightHashSet.MINIMUM_CAPACITY, 32),
+        set.getCapacity());
+
+    // capacity valid
+    set = new LightWeightHashSet<Integer>(64, maxF, minF);
+    assertEquals(Math.max(LightWeightHashSet.MINIMUM_CAPACITY, 64),
+        set.getCapacity());
+
+    // add NUM elements
+    set.addAll(list);
+    int expCap = LightWeightHashSet.MINIMUM_CAPACITY;
+    while (expCap < NUM && maxF * expCap < NUM)
+      expCap <<= 1;
+    assertEquals(expCap, set.getCapacity());
+
+    // see if the set shrinks if we remove elements by removing
+    set.clear();
+    set.addAll(list);
+    int toRemove = set.size() - (int) (set.getCapacity() * minF) + 1;
+    for (int i = 0; i < toRemove; i++) {
+      set.remove(list.get(i));
+    }
+    assertEquals(Math.max(LightWeightHashSet.MINIMUM_CAPACITY, expCap / 2),
+        set.getCapacity());
+
+    LOG.info("Test capacity - DONE");
+  }
+
+  @Test
+  public void testOther() {
+    LOG.info("Test other");
+
+    // remove all
+    assertTrue(set.addAll(list));
+    assertTrue(set.removeAll(list));
+    assertTrue(set.isEmpty());
+
+    // remove sublist
+    List<Integer> sub = new LinkedList<Integer>();
+    for (int i = 0; i < 10; i++) {
+      sub.add(list.get(i));
+    }
+    assertTrue(set.addAll(list));
+    assertTrue(set.removeAll(sub));
+    assertFalse(set.isEmpty());
+    assertEquals(NUM - 10, set.size());
+
+    for (Integer i : sub) {
+      assertFalse(set.contains(i));
+    }
+
+    assertFalse(set.containsAll(sub));
+
+    // the rest of the elements should be there
+    List<Integer> sub2 = new LinkedList<Integer>();
+    for (int i = 10; i < NUM; i++) {
+      sub2.add(list.get(i));
+    }
+    assertTrue(set.containsAll(sub2));
+
+    // to array
+    Integer[] array = set.toArray(new Integer[0]);
+    assertEquals(NUM - 10, array.length);
+    for (int i = 0; i < array.length; i++) {
+      assertTrue(sub2.contains(array[i]));
+    }
+    assertEquals(NUM - 10, set.size());
+
+    // to array
+    Object[] array2 = set.toArray();
+    assertEquals(NUM - 10, array2.length);
+
+    for (int i = 0; i < array2.length; i++) {
+      assertTrue(sub2.contains((Integer) array2[i]));
+    }
+
+    LOG.info("Test other - DONE");
+  }
+
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java?rev=1201991&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java Tue Nov 15 01:13:58 2011
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+import org.junit.Before;
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+
+public class TestLightWeightLinkedSet {
+
+  private static final Log LOG = LogFactory
+	  .getLog("org.apache.hadoop.hdfs.TestLightWeightLinkedSet");
+  private ArrayList<Integer> list = new ArrayList<Integer>();
+  private final int NUM = 100;
+  private LightWeightLinkedSet<Integer> set;
+  private Random rand;
+
+  @Before
+  public void setUp() {
+    float maxF = LightWeightLinkedSet.DEFAULT_MAX_LOAD_FACTOR;
+    float minF = LightWeightLinkedSet.DEFAUT_MIN_LOAD_FACTOR;
+    int initCapacity = LightWeightLinkedSet.MINIMUM_CAPACITY;
+    rand = new Random(System.currentTimeMillis());
+    list.clear();
+    for (int i = 0; i < NUM; i++) {
+      list.add(rand.nextInt());
+    }
+    set = new LightWeightLinkedSet<Integer>(initCapacity, maxF, minF);
+  }
+
+  @Test
+  public void testEmptyBasic() {
+    LOG.info("Test empty basic");
+    Iterator<Integer> iter = set.iterator();
+    // iterator should not have next
+    assertFalse(iter.hasNext());
+    assertEquals(0, set.size());
+    assertTrue(set.isEmpty());
+
+    // poll should return nothing
+    assertNull(set.pollFirst());
+    assertEquals(0, set.pollAll().size());
+    assertEquals(0, set.pollN(10).size());
+
+    LOG.info("Test empty - DONE");
+  }
+
+  @Test
+  public void testOneElementBasic() {
+    LOG.info("Test one element basic");
+    set.add(list.get(0));
+    // set should be non-empty
+    assertEquals(1, set.size());
+    assertFalse(set.isEmpty());
+
+    // iterator should have next
+    Iterator<Integer> iter = set.iterator();
+    assertTrue(iter.hasNext());
+
+    // iterator should not have next
+    assertEquals(list.get(0), iter.next());
+    assertFalse(iter.hasNext());
+    LOG.info("Test one element basic - DONE");
+  }
+
+  @Test
+  public void testMultiBasic() {
+    LOG.info("Test multi element basic");
+    // add once
+    for (Integer i : list) {
+      assertTrue(set.add(i));
+    }
+    assertEquals(list.size(), set.size());
+
+    // check if the elements are in the set
+    for (Integer i : list) {
+      assertTrue(set.contains(i));
+    }
+
+    // add again - should return false each time
+    for (Integer i : list) {
+      assertFalse(set.add(i));
+    }
+
+    // check again if the elements are there
+    for (Integer i : list) {
+      assertTrue(set.contains(i));
+    }
+
+    Iterator<Integer> iter = set.iterator();
+    int num = 0;
+    while (iter.hasNext()) {
+      assertEquals(list.get(num++), iter.next());
+    }
+    // check the number of element from the iterator
+    assertEquals(list.size(), num);
+    LOG.info("Test multi element basic - DONE");
+  }
+
+  @Test
+  public void testRemoveOne() {
+    LOG.info("Test remove one");
+    assertTrue(set.add(list.get(0)));
+    assertEquals(1, set.size());
+
+    // remove from the head/tail
+    assertTrue(set.remove(list.get(0)));
+    assertEquals(0, set.size());
+
+    // check the iterator
+    Iterator<Integer> iter = set.iterator();
+    assertFalse(iter.hasNext());
+
+    // poll should return nothing
+    assertNull(set.pollFirst());
+    assertEquals(0, set.pollAll().size());
+    assertEquals(0, set.pollN(10).size());
+
+    // add the element back to the set
+    assertTrue(set.add(list.get(0)));
+    assertEquals(1, set.size());
+
+    iter = set.iterator();
+    assertTrue(iter.hasNext());
+    LOG.info("Test remove one - DONE");
+  }
+
+  @Test
+  public void testRemoveMulti() {
+    LOG.info("Test remove multi");
+    for (Integer i : list) {
+      assertTrue(set.add(i));
+    }
+    for (int i = 0; i < NUM / 2; i++) {
+      assertTrue(set.remove(list.get(i)));
+    }
+
+    // the deleted elements should not be there
+    for (int i = 0; i < NUM / 2; i++) {
+      assertFalse(set.contains(list.get(i)));
+    }
+
+    // the rest should be there
+    for (int i = NUM / 2; i < NUM; i++) {
+      assertTrue(set.contains(list.get(i)));
+    }
+
+    Iterator<Integer> iter = set.iterator();
+    // the remaining elements should be in order
+    int num = NUM / 2;
+    while (iter.hasNext()) {
+      assertEquals(list.get(num++), iter.next());
+    }
+    assertEquals(num, NUM);
+    LOG.info("Test remove multi - DONE");
+  }
+
+  @Test
+  public void testRemoveAll() {
+    LOG.info("Test remove all");
+    for (Integer i : list) {
+      assertTrue(set.add(i));
+    }
+    for (int i = 0; i < NUM; i++) {
+      assertTrue(set.remove(list.get(i)));
+    }
+    // the deleted elements should not be there
+    for (int i = 0; i < NUM; i++) {
+      assertFalse(set.contains(list.get(i)));
+    }
+
+    // iterator should not have next
+    Iterator<Integer> iter = set.iterator();
+    assertFalse(iter.hasNext());
+    assertTrue(set.isEmpty());
+    LOG.info("Test remove all - DONE");
+  }
+
+  @Test
+  public void testPollOneElement() {
+    LOG.info("Test poll one element");
+    set.add(list.get(0));
+    assertEquals(list.get(0), set.pollFirst());
+    assertNull(set.pollFirst());
+    LOG.info("Test poll one element - DONE");
+  }
+
+  @Test
+  public void testPollMulti() {
+    LOG.info("Test poll multi");
+    for (Integer i : list) {
+      assertTrue(set.add(i));
+    }
+    // remove half of the elements by polling
+    for (int i = 0; i < NUM / 2; i++) {
+      assertEquals(list.get(i), set.pollFirst());
+    }
+    assertEquals(NUM / 2, set.size());
+    // the deleted elements should not be there
+    for (int i = 0; i < NUM / 2; i++) {
+      assertFalse(set.contains(list.get(i)));
+    }
+    // the rest should be there
+    for (int i = NUM / 2; i < NUM; i++) {
+      assertTrue(set.contains(list.get(i)));
+    }
+    Iterator<Integer> iter = set.iterator();
+    // the remaining elements should be in order
+    int num = NUM / 2;
+    while (iter.hasNext()) {
+      assertEquals(list.get(num++), iter.next());
+    }
+    assertEquals(num, NUM);
+
+    // add elements back
+    for (int i = 0; i < NUM / 2; i++) {
+      assertTrue(set.add(list.get(i)));
+    }
+    // order should be switched
+    assertEquals(NUM, set.size());
+    for (int i = NUM / 2; i < NUM; i++) {
+      assertEquals(list.get(i), set.pollFirst());
+    }
+    for (int i = 0; i < NUM / 2; i++) {
+      assertEquals(list.get(i), set.pollFirst());
+    }
+    assertEquals(0, set.size());
+    assertTrue(set.isEmpty());
+    LOG.info("Test poll multi - DONE");
+  }
+
+  @Test
+  public void testPollAll() {
+    LOG.info("Test poll all");
+    for (Integer i : list) {
+      assertTrue(set.add(i));
+    }
+    // remove all elements by polling
+    while (set.pollFirst() != null);
+    assertEquals(0, set.size());
+    assertTrue(set.isEmpty());
+
+    // the deleted elements should not be there
+    for (int i = 0; i < NUM; i++) {
+      assertFalse(set.contains(list.get(i)));
+    }
+
+    Iterator<Integer> iter = set.iterator();
+    assertFalse(iter.hasNext());
+    LOG.info("Test poll all - DONE");
+  }
+
+  @Test
+  public void testPollNOne() {
+    LOG.info("Test pollN one");
+    set.add(list.get(0));
+    List<Integer> l = set.pollN(10);
+    assertEquals(1, l.size());
+    assertEquals(list.get(0), l.get(0));
+    LOG.info("Test pollN one - DONE");
+  }
+
+  @Test
+  public void testPollNMulti() {
+    LOG.info("Test pollN multi");
+
+    // use addAll
+    set.addAll(list);
+
+    // poll existing elements
+    List<Integer> l = set.pollN(10);
+    assertEquals(10, l.size());
+
+    for (int i = 0; i < 10; i++) {
+      assertEquals(list.get(i), l.get(i));
+    }
+
+    // poll more elements than present
+    l = set.pollN(1000);
+    assertEquals(NUM - 10, l.size());
+
+    // check the order
+    for (int i = 10; i < NUM; i++) {
+      assertEquals(list.get(i), l.get(i - 10));
+    }
+    // set is empty
+    assertTrue(set.isEmpty());
+    assertEquals(0, set.size());
+
+    LOG.info("Test pollN multi - DONE");
+  }
+
+  @Test
+  public void testClear() {
+    LOG.info("Test clear");
+    // use addAll
+    set.addAll(list);
+    assertEquals(NUM, set.size());
+    assertFalse(set.isEmpty());
+
+    // clear the set
+    set.clear();
+    assertEquals(0, set.size());
+    assertTrue(set.isEmpty());
+
+    // poll should return an empty list
+    assertEquals(0, set.pollAll().size());
+    assertEquals(0, set.pollN(10).size());
+    assertNull(set.pollFirst());
+
+    // iterator should be empty
+    Iterator<Integer> iter = set.iterator();
+    assertFalse(iter.hasNext());
+
+    LOG.info("Test clear - DONE");
+  }
+
+  @Test
+  public void testOther() {
+    LOG.info("Test other");
+    assertTrue(set.addAll(list));
+    // to array
+    Integer[] array = set.toArray(new Integer[0]);
+    assertEquals(NUM, array.length);
+    for (int i = 0; i < array.length; i++) {
+      assertTrue(list.contains(array[i]));
+    }
+    assertEquals(NUM, set.size());
+
+    // to array
+    Object[] array2 = set.toArray();
+    assertEquals(NUM, array2.length);
+    for (int i = 0; i < array2.length; i++) {
+      assertTrue(list.contains((Integer) array2[i]));
+    }
+    LOG.info("Test capacity - DONE");
+  }
+
+}
\ No newline at end of file