You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/11/15 00:57:00 UTC

svn commit: r1542125 [1/2] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/libexec/ hadoop-hdfs/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ hadoop-hdfs/src/main/java/org/apache/h...

Author: arp
Date: Thu Nov 14 23:56:56 2013
New Revision: 1542125

URL: http://svn.apache.org/r1542125
Log:
Merging r1541618 through r1542122 from trunk to branch HDFS-2832

Added:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestTokenAspect.java
      - copied unchanged from r1542122, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestTokenAspect.java
Modified:
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java
    hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1541618-1542122

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh Thu Nov 14 23:56:56 2013
@@ -55,7 +55,7 @@ print "Setting HTTPFS_HOME:          ${H
 #
 if [ -e "${HTTPFS_HOME}/bin/httpfs-env.sh" ]; then
   print "Sourcing:                    ${HTTPFS_HOME}/bin/httpfs-env.sh"
-  source ${HTTPFS_HOME}/bin/HTTPFS-env.sh
+  source ${HTTPFS_HOME}/bin/httpfs-env.sh
   grep "^ *export " ${HTTPFS_HOME}/bin/httpfs-env.sh | sed 's/ *export/  setting/'
 fi
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov 14 23:56:56 2013
@@ -13,6 +13,9 @@ Trunk (Unreleased)
 
     HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
 
+    HDFS-5444. Choose default web UI based on browser capabilities. (Haohui Mai
+    via jing9)
+
   IMPROVEMENTS
 
     HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
@@ -194,6 +197,8 @@ Trunk (Unreleased)
 
     HDFS-5485. Add command-line support for modifyDirective. (cmccabe)
 
+    HDFS-5366. recaching improvements (cmccabe)
+
   OPTIMIZATIONS
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 
@@ -475,6 +480,20 @@ Release 2.3.0 - UNRELEASED
     HDFS-5495. Remove further JUnit3 usages from HDFS.
     (Jarek Jarcec Cecho via wang)
 
+    HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9) 
+
+    HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9)
+
+    HDFS-5440. Extract the logic of handling delegation tokens in HftpFileSystem 
+    to the TokenAspect class. (Haohui Mai via jing9)
+
+    HDFS-5487. Introduce unit test for TokenAspect. (Haohui Mai via jing9)
+
+    HDFS-4995. Make getContentSummary less expensive. (kihwal)
+
+    HDFS-5506. Use URLConnectionFactory in DelegationTokenFetcher. (Haohui Mai
+    via jing9)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -529,13 +548,20 @@ Release 2.3.0 - UNRELEASED
     HDFS-5476. Snapshot: clean the blocks/files/directories under a renamed 
     file/directory while deletion. (jing9)
 
-    HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9) 
-
-    HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9)
-
     HDFS-5425. Renaming underconstruction file with snapshots can make NN failure on 
     restart. (jing9 and Vinay)
 
+    HDFS-5474. Deletesnapshot can make Namenode in safemode on NN restarts. 
+    (Sathish via jing9)
+
+    HDFS-5075. httpfs-config.sh calls out incorrect env script name
+    (Timothy St. Clair via stevel)
+
+    HDFS-5504. In HA mode, OP_DELETE_SNAPSHOT is not decrementing the safemode threshold, 
+    leads to NN safemode. (Vinay via jing9)
+
+    HDFS-5438. Flaws in block report processing can cause data loss. (kihwal)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1541618-1542122

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java Thu Nov 14 23:56:56 2013
@@ -42,4 +42,8 @@ public class DFSClientFaultInjector {
   public boolean uncorruptPacket() {
     return false;
   }
+
+  public boolean failPacket() {
+    return false;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Thu Nov 14 23:56:56 2013
@@ -198,6 +198,8 @@ public class DFSConfigKeys extends Commo
   
   public static final String  DFS_LIST_LIMIT = "dfs.ls.limit";
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
+  public static final String  DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";
+  public static final int     DFS_CONTENT_SUMMARY_LIMIT_DEFAULT = 0;
   public static final String  DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
   public static final int     DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
   public static final String  DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
@@ -342,6 +344,8 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
   public static final String  DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
   public static final long    DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
+  public static final String  DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms";
+  public static final long    DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 60000L;
   public static final String  DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
   public static final int     DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30;
   public static final String  DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval";

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Nov 14 23:56:56 2013
@@ -151,6 +151,7 @@ public class DFSOutputStream extends FSO
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
   private CachingStrategy cachingStrategy;
+  private boolean failPacket = false;
   
   private static class Packet {
     private static final long HEART_BEAT_SEQNO = -1L;
@@ -752,6 +753,16 @@ public class DFSOutputStream extends FSO
                                     one.seqno + " but received " + seqno);
             }
             isLastPacketInBlock = one.lastPacketInBlock;
+
+            // Fail the packet write for testing in order to force a
+            // pipeline recovery.
+            if (DFSClientFaultInjector.get().failPacket() &&
+                isLastPacketInBlock) {
+              failPacket = true;
+              throw new IOException(
+                    "Failing the last packet for testing.");
+            }
+              
             // update bytesAcked
             block.setNumBytes(one.getLastByteOffsetBlock());
 
@@ -1044,7 +1055,18 @@ public class DFSOutputStream extends FSO
         accessToken = lb.getBlockToken();
         
         // set up the pipeline again with the remaining nodes
-        success = createBlockOutputStream(nodes, newGS, isRecovery);
+        if (failPacket) { // for testing
+          success = createBlockOutputStream(nodes, newGS-1, isRecovery);
+          failPacket = false;
+          try {
+            // Give DNs time to send in bad reports. In real situations,
+            // good reports should follow bad ones, if client committed
+            // with those nodes.
+            Thread.sleep(2000);
+          } catch (InterruptedException ie) {}
+        } else {
+          success = createBlockOutputStream(nodes, newGS, isRecovery);
+        }
       }
 
       if (success) {
@@ -1904,7 +1926,9 @@ public class DFSOutputStream extends FSO
   // be called during unit tests
   private void completeFile(ExtendedBlock last) throws IOException {
     long localstart = Time.now();
+    long localTimeout = 400;
     boolean fileComplete = false;
+    int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
     while (!fileComplete) {
       fileComplete =
           dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
@@ -1920,7 +1944,13 @@ public class DFSOutputStream extends FSO
             throw new IOException(msg);
         }
         try {
-          Thread.sleep(400);
+          Thread.sleep(localTimeout);
+          if (retries == 0) {
+            throw new IOException("Unable to close file because the last block"
+                + " does not have enough number of replicas.");
+          }
+          retries--;
+          localTimeout *= 2;
           if (Time.now() - localstart > 5000) {
             DFSClient.LOG.info("Could not complete " + src + " retrying...");
           }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Thu Nov 14 23:56:56 2013
@@ -230,6 +230,29 @@ public class BlockInfoUnderConstruction 
   }
 
   /**
+   * Process the recorded replicas. When about to commit or finish the
+   * pipeline recovery sort out bad replicas.
+   * @param genStamp  The final generation stamp for the block.
+   */
+  public void setGenerationStampAndVerifyReplicas(long genStamp) {
+    if (replicas == null)
+      return;
+
+    // Remove the replicas with wrong gen stamp.
+    // The replica list is unchanged.
+    for (ReplicaUnderConstruction r : replicas) {
+      if (genStamp != r.getGenerationStamp()) {
+        r.getExpectedStorageLocation().removeBlock(this);
+        NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
+            + "from location: " + r);
+      }
+    }
+
+    // Set the generation stamp for the block.
+    setGenerationStamp(genStamp);
+  }
+
+  /**
    * Commit block's length and generation stamp as reported by the client.
    * Set block state to {@link BlockUCState#COMMITTED}.
    * @param block - contains client reported block length and generation 
@@ -301,6 +324,8 @@ public class BlockInfoUnderConstruction 
     while (it.hasNext()) {
       ReplicaUnderConstruction r = it.next();
       if(r.getExpectedStorageLocation() == storage) {
+        // Record the gen stamp from the report
+        r.setGenerationStamp(block.getGenerationStamp());
         return;
       } else if (r.getExpectedStorageLocation().getDatanodeDescriptor() ==
           storage.getDatanodeDescriptor()) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Nov 14 23:56:56 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -1051,7 +1052,8 @@ public class BlockManager {
           + blk + " not found");
       return;
     }
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn, storageID);
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
+        Reason.CORRUPTION_REPORTED), dn, storageID);
   }
 
   private void markBlockAsCorrupt(BlockToMarkCorrupt b,
@@ -1074,7 +1076,8 @@ public class BlockManager {
     node.addBlock(storageID, b.stored);
 
     // Add this replica to corruptReplicas Map
-    corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
+    corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
+        b.reasonCode);
     if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node);
@@ -1574,22 +1577,27 @@ public class BlockManager {
     final BlockInfo stored;
     /** The reason to mark corrupt. */
     final String reason;
-    
-    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+    /** The reason code to be stored */
+    final Reason reasonCode;
+
+    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
+        Reason reasonCode) {
       Preconditions.checkNotNull(corrupted, "corrupted is null");
       Preconditions.checkNotNull(stored, "stored is null");
 
       this.corrupted = corrupted;
       this.stored = stored;
       this.reason = reason;
+      this.reasonCode = reasonCode;
     }
 
-    BlockToMarkCorrupt(BlockInfo stored, String reason) {
-      this(stored, stored, reason);
+    BlockToMarkCorrupt(BlockInfo stored, String reason, Reason reasonCode) {
+      this(stored, stored, reason, reasonCode);
     }
 
-    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
-      this(new BlockInfo(stored), stored, reason);
+    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
+        Reason reasonCode) {
+      this(new BlockInfo(stored), stored, reason, reasonCode);
       //the corrupted block in datanode has a different generation stamp
       corrupted.setGenerationStamp(gs);
     }
@@ -1946,9 +1954,11 @@ assert storedBlock.findDatanode(dn) < 0 
       return storedBlock;
     }
 
-    //add replica if appropriate
+    // Add replica if appropriate. If the replica was previously corrupt
+    // but now okay, it might need to be updated.
     if (reportedState == ReplicaState.FINALIZED
-        && storedBlock.findDatanode(dn) < 0) {
+        && (storedBlock.findDatanode(dn) < 0
+        || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
       toAdd.add(storedBlock);
     }
     return storedBlock;
@@ -2039,12 +2049,13 @@ assert storedBlock.findDatanode(dn) < 0 
           return new BlockToMarkCorrupt(storedBlock, reportedGS,
               "block is " + ucState + " and reported genstamp " + reportedGS
               + " does not match genstamp in block map "
-              + storedBlock.getGenerationStamp());
+              + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
         } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
           return new BlockToMarkCorrupt(storedBlock,
               "block is " + ucState + " and reported length " +
               reported.getNumBytes() + " does not match " +
-              "length in block map " + storedBlock.getNumBytes());
+              "length in block map " + storedBlock.getNumBytes(),
+              Reason.SIZE_MISMATCH);
         } else {
           return null; // not corrupt
         }
@@ -2060,7 +2071,7 @@ assert storedBlock.findDatanode(dn) < 0 
         return new BlockToMarkCorrupt(storedBlock, reportedGS,
             "reported " + reportedState + " replica with genstamp " + reportedGS
             + " does not match COMPLETE block's genstamp in block map "
-            + storedBlock.getGenerationStamp());
+            + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
       } else { // COMPLETE block, same genstamp
         if (reportedState == ReplicaState.RBW) {
           // If it's a RBW report for a COMPLETE block, it may just be that
@@ -2073,7 +2084,8 @@ assert storedBlock.findDatanode(dn) < 0 
           return null;
         } else {
           return new BlockToMarkCorrupt(storedBlock,
-              "reported replica has invalid state " + reportedState);
+              "reported replica has invalid state " + reportedState,
+              Reason.INVALID_STATE);
         }
       }
     case RUR:       // should not be reported
@@ -2084,7 +2096,7 @@ assert storedBlock.findDatanode(dn) < 0 
       " on " + dn + " size " + storedBlock.getNumBytes();
       // log here at WARN level since this is really a broken HDFS invariant
       LOG.warn(msg);
-      return new BlockToMarkCorrupt(storedBlock, msg);
+      return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
     }
   }
 
@@ -2201,6 +2213,11 @@ assert storedBlock.findDatanode(dn) < 0 
         logAddStoredBlock(storedBlock, node);
       }
     } else {
+      // if the same block is added again and the replica was corrupt
+      // previously because of a wrong gen stamp, remove it from the
+      // corrupt block list.
+      corruptReplicas.removeFromCorruptReplicasMap(block, node,
+          Reason.GENSTAMP_MISMATCH);
       curReplicaDelta = 0;
       blockLog.warn("BLOCK* addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
@@ -2297,7 +2314,8 @@ assert storedBlock.findDatanode(dn) < 0 
     DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
     for (DatanodeDescriptor node : nodesCopy) {
       try {
-        if (!invalidateBlock(new BlockToMarkCorrupt(blk, null), node)) {
+        if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
+              Reason.ANY), node)) {
           removedFromBlocksMap = false;
         }
       } catch (IOException e) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Thu Nov 14 23:56:56 2013
@@ -204,6 +204,7 @@ public class CacheReplicationMonitor ext
     namesystem.writeLock();
     try {
       rescanCachedBlockMap();
+      blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
     } finally {
       namesystem.writeUnlock();
     }
@@ -316,17 +317,21 @@ public class CacheReplicationMonitor ext
       int numCached = cached.size();
       if (numCached >= neededCached) {
         // If we have enough replicas, drop all pending cached.
-        for (DatanodeDescriptor datanode : pendingCached) {
+        for (Iterator<DatanodeDescriptor> iter = pendingCached.iterator();
+            iter.hasNext(); ) {
+          DatanodeDescriptor datanode = iter.next();
           datanode.getPendingCached().remove(cblock);
+          iter.remove();
         }
-        pendingCached.clear();
       }
       if (numCached < neededCached) {
         // If we don't have enough replicas, drop all pending uncached.
-        for (DatanodeDescriptor datanode : pendingUncached) {
+        for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
+            iter.hasNext(); ) {
+          DatanodeDescriptor datanode = iter.next();
           datanode.getPendingUncached().remove(cblock);
+          iter.remove();
         }
-        pendingUncached.clear();
       }
       int neededUncached = numCached -
           (pendingUncached.size() + neededCached);

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java Thu Nov 14 23:56:56 2013
@@ -36,8 +36,18 @@ import java.util.*;
 @InterfaceAudience.Private
 public class CorruptReplicasMap{
 
-  private SortedMap<Block, Collection<DatanodeDescriptor>> corruptReplicasMap =
-    new TreeMap<Block, Collection<DatanodeDescriptor>>();
+  /** The corruption reason code */
+  public static enum Reason {
+    NONE,                // not specified.
+    ANY,                 // wildcard reason
+    GENSTAMP_MISMATCH,   // mismatch in generation stamps
+    SIZE_MISMATCH,       // mismatch in sizes
+    INVALID_STATE,       // invalid state
+    CORRUPTION_REPORTED  // client or datanode reported the corruption
+  }
+
+  private SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
+    new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
   
   /**
    * Mark the block belonging to datanode as corrupt.
@@ -48,9 +58,22 @@ public class CorruptReplicasMap{
    */
   public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
       String reason) {
-    Collection<DatanodeDescriptor> nodes = getNodes(blk);
+    addToCorruptReplicasMap(blk, dn, reason, Reason.NONE);
+  }
+
+  /**
+   * Mark the block belonging to datanode as corrupt.
+   *
+   * @param blk Block to be added to CorruptReplicasMap
+   * @param dn DatanodeDescriptor which holds the corrupt replica
+   * @param reason a textual reason (for logging purposes)
+   * @param reasonCode the enum representation of the reason
+   */
+  public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+      String reason, Reason reasonCode) {
+    Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
     if (nodes == null) {
-      nodes = new TreeSet<DatanodeDescriptor>();
+      nodes = new HashMap<DatanodeDescriptor, Reason>();
       corruptReplicasMap.put(blk, nodes);
     }
     
@@ -61,8 +84,7 @@ public class CorruptReplicasMap{
       reasonText = "";
     }
     
-    if (!nodes.contains(dn)) {
-      nodes.add(dn);
+    if (!nodes.keySet().contains(dn)) {
       NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
                                    blk.getBlockName() +
                                    " added as corrupt on " + dn +
@@ -76,6 +98,8 @@ public class CorruptReplicasMap{
                                    " by " + Server.getRemoteIp() +
                                    reasonText);
     }
+    // Add the node or update the reason.
+    nodes.put(dn, reasonCode);
   }
 
   /**
@@ -97,10 +121,24 @@ public class CorruptReplicasMap{
              false if the replica is not in the map
    */ 
   boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
-    Collection<DatanodeDescriptor> datanodes = corruptReplicasMap.get(blk);
+    return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
+  }
+
+  boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
+      Reason reason) {
+    Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
+    boolean removed = false;
     if (datanodes==null)
       return false;
-    if (datanodes.remove(datanode)) { // remove the replicas
+
+    // if reasons can be compared but don't match, return false.
+    Reason storedReason = datanodes.get(datanode);
+    if (reason != Reason.ANY && storedReason != null &&
+        reason != storedReason) {
+      return false;
+    }
+
+    if (datanodes.remove(datanode) != null) { // remove the replicas
       if (datanodes.isEmpty()) {
         // remove the block if there is no more corrupted replicas
         corruptReplicasMap.remove(blk);
@@ -118,7 +156,10 @@ public class CorruptReplicasMap{
    * @return collection of nodes. Null if does not exists
    */
   Collection<DatanodeDescriptor> getNodes(Block blk) {
-    return corruptReplicasMap.get(blk);
+    Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
+    if (nodes == null)
+      return null;
+    return nodes.keySet();
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Thu Nov 14 23:56:56 2013
@@ -170,6 +170,21 @@ public class DatanodeDescriptor extends 
     return pendingUncached;
   }
 
+  /**
+   * The time when the last batch of caching directives was sent, in
+   * monotonic milliseconds.
+   */
+  private long lastCachingDirectiveSentTimeMs;
+
+  /**
+   * Head of the list of blocks on the datanode
+   */
+  private volatile BlockInfo blockList = null;
+  /**
+   * Number of blocks on the datanode
+   */
+  private int numBlocks = 0;
+
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   public boolean isAlive = false;
@@ -661,5 +676,21 @@ public class DatanodeDescriptor extends 
       return storage;
     }
   }
+
+  /**
+   * @return   The time at which we last sent caching directives to this 
+   *           DataNode, in monotonic milliseconds.
+   */
+  public long getLastCachingDirectiveSentTimeMs() {
+    return this.lastCachingDirectiveSentTimeMs;
+  }
+
+  /**
+   * @param time  The time at which we last sent caching directives to this 
+   *              DataNode, in monotonic milliseconds.
+   */
+  public void setLastCachingDirectiveSentTimeMs(long time) {
+    this.lastCachingDirectiveSentTimeMs = time;
+  }
 }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Thu Nov 14 23:56:56 2013
@@ -149,7 +149,7 @@ public class DatanodeManager {
    * Whether we should tell datanodes what to cache in replies to
    * heartbeat messages.
    */
-  private boolean sendCachingCommands = false;
+  private boolean shouldSendCachingCommands = false;
 
   /**
    * The number of datanodes for each software version. This list should change
@@ -159,6 +159,16 @@ public class DatanodeManager {
   private HashMap<String, Integer> datanodesSoftwareVersions =
     new HashMap<String, Integer>(4, 0.75f);
   
+  /**
+   * The minimum time between resending caching directives to Datanodes,
+   * in milliseconds.
+   *
+   * Note that when a rescan happens, we will send the new directives
+   * as soon as possible.  This timeout only applies to resending 
+   * directives that we've already sent.
+   */
+  private final long timeBetweenResendingCachingDirectivesMs;
+  
   DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
@@ -241,6 +251,9 @@ public class DatanodeManager {
         DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
         " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
         "It should be a positive non-zero float value, not greater than 1.0f.");
+    this.timeBetweenResendingCachingDirectivesMs = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
+        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT);
   }
 
   private static long getStaleIntervalFromConf(Configuration conf,
@@ -1297,17 +1310,28 @@ public class DatanodeManager {
           cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
               blockPoolId, blks));
         }
-        DatanodeCommand pendingCacheCommand =
-            getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
-              DatanodeProtocol.DNA_CACHE, blockPoolId);
-        if (pendingCacheCommand != null) {
-          cmds.add(pendingCacheCommand);
-        }
-        DatanodeCommand pendingUncacheCommand =
-            getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
-              DatanodeProtocol.DNA_UNCACHE, blockPoolId);
-        if (pendingUncacheCommand != null) {
-          cmds.add(pendingUncacheCommand);
+        boolean sendingCachingCommands = false;
+        long nowMs = Time.monotonicNow();
+        if (shouldSendCachingCommands && 
+            ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
+                timeBetweenResendingCachingDirectivesMs)) {
+          DatanodeCommand pendingCacheCommand =
+              getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
+                DatanodeProtocol.DNA_CACHE, blockPoolId);
+          if (pendingCacheCommand != null) {
+            cmds.add(pendingCacheCommand);
+            sendingCachingCommands = true;
+          }
+          DatanodeCommand pendingUncacheCommand =
+              getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
+                DatanodeProtocol.DNA_UNCACHE, blockPoolId);
+          if (pendingUncacheCommand != null) {
+            cmds.add(pendingUncacheCommand);
+            sendingCachingCommands = true;
+          }
+          if (sendingCachingCommands) {
+            nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
+          }
         }
 
         blockManager.addKeyUpdateCommand(cmds, nodeinfo);
@@ -1345,19 +1369,13 @@ public class DatanodeManager {
     if (length == 0) {
       return null;
     }
-    // Read and clear the existing cache commands.
+    // Read the existing cache commands.
     long[] blockIds = new long[length];
     int i = 0;
     for (Iterator<CachedBlock> iter = list.iterator();
             iter.hasNext(); ) {
       CachedBlock cachedBlock = iter.next();
       blockIds[i++] = cachedBlock.getBlockId();
-      iter.remove();
-    }
-    if (!sendCachingCommands) {
-      // Do not send caching commands unless the FSNamesystem told us we
-      // should.
-      return null;
     }
     return new BlockIdCommand(action, poolId, blockIds);
   }
@@ -1408,13 +1426,25 @@ public class DatanodeManager {
     }
   }
 
+  /**
+   * Reset the lastCachingDirectiveSentTimeMs field of all the DataNodes we
+   * know about.
+   */
+  public void resetLastCachingDirectiveSentTime() {
+    synchronized (datanodeMap) {
+      for (DatanodeDescriptor dn : datanodeMap.values()) {
+        dn.setLastCachingDirectiveSentTimeMs(0L);
+      }
+    }
+  }
+
   @Override
   public String toString() {
     return getClass().getSimpleName() + ": " + host2DatanodeMap;
   }
 
-  public void setSendCachingCommands(boolean sendCachingCommands) {
-    this.sendCachingCommands = sendCachingCommands;
+  public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
+    this.shouldSendCachingCommands = shouldSendCachingCommands;
   }
 }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Thu Nov 14 23:56:56 2013
@@ -289,6 +289,10 @@ public class FsDatasetCache {
     mappableBlockMap.put(key, new Value(null, State.CACHING));
     volumeExecutor.execute(
         new CachingTask(key, blockFileName, length, genstamp));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initiating caching for Block with id " + blockId +
+          ", pool " + bpid);
+    }
   }
 
   synchronized void uncacheBlock(String bpid, long blockId) {
@@ -427,6 +431,10 @@ public class FsDatasetCache {
             mappableBlock.close();
           }
           numBlocksFailedToCache.incrementAndGet();
+
+          synchronized (FsDatasetCache.this) {
+            mappableBlockMap.remove(key);
+          }
         }
       }
     }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java Thu Nov 14 23:56:56 2013
@@ -44,20 +44,6 @@ import com.google.common.base.Preconditi
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class MappableBlock implements Closeable {
-  public static interface Mlocker {
-    void mlock(MappedByteBuffer mmap, long length) throws IOException;
-  }
-  
-  private static class PosixMlocker implements Mlocker {
-    public void mlock(MappedByteBuffer mmap, long length)
-        throws IOException {
-      NativeIO.POSIX.mlock(mmap, length);
-    }
-  }
-
-  @VisibleForTesting
-  public static Mlocker mlocker = new PosixMlocker();
-
   private MappedByteBuffer mmap;
   private final long length;
 
@@ -96,7 +82,7 @@ public class MappableBlock implements Cl
         throw new IOException("Block InputStream has no FileChannel.");
       }
       mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
-      mlocker.mlock(mmap, length);
+      NativeIO.POSIX.cacheManipulator.mlock(blockFileName, mmap, length);
       verifyChecksum(length, metaIn, blockChannel, blockFileName);
       mappableBlock = new MappableBlock(mmap, length);
     } finally {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Nov 14 23:56:56 2013
@@ -114,7 +114,9 @@ public class FSDirectory implements Clos
   private final int maxComponentLength;
   private final int maxDirItems;
   private final int lsLimit;  // max list limit
+  private final int contentCountLimit; // max content summary counts per run
   private final INodeMap inodeMap; // Synchronized by dirLock
+  private long yieldCount = 0; // keep track of lock yield count.
 
   // lock to protect the directory and BlockMap
   private ReentrantReadWriteLock dirLock;
@@ -145,6 +147,14 @@ public class FSDirectory implements Clos
     return this.dirLock.getReadHoldCount() > 0;
   }
 
+  public int getReadHoldCount() {
+    return this.dirLock.getReadHoldCount();
+  }
+
+  public int getWriteHoldCount() {
+    return this.dirLock.getWriteHoldCount();
+  }
+
   /**
    * Caches frequently used file names used in {@link INode} to reuse 
    * byte[] objects and reduce heap usage.
@@ -161,6 +171,10 @@ public class FSDirectory implements Clos
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
     this.lsLimit = configuredLimit>0 ?
         configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
+
+    this.contentCountLimit = conf.getInt(
+        DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY,
+        DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_DEFAULT);
     
     // filesystem limits
     this.maxComponentLength = conf.getInt(
@@ -2296,13 +2310,26 @@ public class FSDirectory implements Clos
         throw new FileNotFoundException("File does not exist: " + srcs);
       }
       else {
-        return targetNode.computeContentSummary();
+        // Make it relinquish locks everytime contentCountLimit entries are
+        // processed. 0 means disabled. I.e. blocking for the entire duration.
+        ContentSummaryComputationContext cscc =
+
+            new ContentSummaryComputationContext(this, getFSNamesystem(),
+            contentCountLimit);
+        ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc);
+        yieldCount += cscc.getYieldCount();
+        return cs;
       }
     } finally {
       readUnlock();
     }
   }
 
+  @VisibleForTesting
+  public long getYieldCount() {
+    return yieldCount;
+  }
+
   public INodeMap getINodeMap() {
     return inodeMap;
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Nov 14 23:56:56 2013
@@ -593,7 +593,7 @@ public class FSEditLogLoader {
       fsNamesys.getSnapshotManager().deleteSnapshot(
           deleteSnapshotOp.snapshotRoot, deleteSnapshotOp.snapshotName,
           collectedBlocks, removedINodes);
-      fsNamesys.removeBlocks(collectedBlocks);
+      fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
       collectedBlocks.clear();
       fsNamesys.dir.removeFromInodeMap(removedINodes);
       removedINodes.clear();

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Nov 14 23:56:56 2013
@@ -1009,7 +1009,7 @@ public class FSNamesystem implements Nam
       nnEditLogRoller.start();
 
       cacheManager.activate();
-      blockManager.getDatanodeManager().setSendCachingCommands(true);
+      blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
     } finally {
       writeUnlock();
       startingActiveService = false;
@@ -1060,7 +1060,7 @@ public class FSNamesystem implements Nam
         dir.fsImage.updateLastAppliedTxIdFromWritten();
       }
       cacheManager.deactivate();
-      blockManager.getDatanodeManager().setSendCachingCommands(false);
+      blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
     } finally {
       writeUnlock();
     }
@@ -1297,6 +1297,14 @@ public class FSNamesystem implements Nam
     return hasReadLock() || hasWriteLock();
   }
 
+  public int getReadHoldCount() {
+    return this.fsLock.getReadHoldCount();
+  }
+
+  public int getWriteHoldCount() {
+    return this.fsLock.getWriteHoldCount();
+  }
+
   NamespaceInfo getNamespaceInfo() {
     readLock();
     try {
@@ -3305,6 +3313,18 @@ public class FSNamesystem implements Nam
       return;
     }
     
+    removeBlocksAndUpdateSafemodeTotal(blocks);
+  }
+
+  /**
+   * Removes the blocks from blocksmap and updates the safemode blocks total
+   * 
+   * @param blocks
+   *          An instance of {@link BlocksMapUpdateInfo} which contains a list
+   *          of blocks that need to be removed from blocksMap
+   */
+  void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
+    assert hasWriteLock();
     // In the case that we are a Standby tailing edits from the
     // active while in safe-mode, we need to track the total number
     // of blocks and safe blocks in the system.
@@ -3325,9 +3345,9 @@ public class FSNamesystem implements Nam
     }
     if (trackBlockCounts) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Adjusting safe-mode totals for deletion of " + src + ":" +
-            "decreasing safeBlocks by " + numRemovedSafe +
-            ", totalBlocks by " + numRemovedComplete);
+        LOG.debug("Adjusting safe-mode totals for deletion."
+            + "decreasing safeBlocks by " + numRemovedSafe
+            + ", totalBlocks by " + numRemovedComplete);
       }
       adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete);
     }
@@ -5883,8 +5903,8 @@ public class FSNamesystem implements Nam
     }
 
     // Update old block with the new generation stamp and new length
-    blockinfo.setGenerationStamp(newBlock.getGenerationStamp());
     blockinfo.setNumBytes(newBlock.getNumBytes());
+    blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
 
     // find the DatanodeDescriptor objects
     final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
@@ -6953,6 +6973,7 @@ public class FSNamesystem implements Nam
       return; // Return previous response
     }
     boolean success = false;
+    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -6961,7 +6982,6 @@ public class FSNamesystem implements Nam
         checkOwner(pc, snapshotRoot);
       }
 
-      BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
       List<INode> removedINodes = new ChunkedArrayList<INode>();
       dir.writeLock();
       try {
@@ -6972,8 +6992,6 @@ public class FSNamesystem implements Nam
         dir.writeUnlock();
       }
       removedINodes.clear();
-      this.removeBlocks(collectedBlocks);
-      collectedBlocks.clear();
       getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
           cacheEntry != null);
       success = true;
@@ -6982,7 +7000,10 @@ public class FSNamesystem implements Nam
       RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
-    
+
+    removeBlocks(collectedBlocks);
+    collectedBlocks.clear();
+
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
       logAuditEvent(true, "deleteSnapshot", rootPath, null, null);

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Thu Nov 14 23:56:56 2013
@@ -371,10 +371,18 @@ public abstract class INode implements I
   public abstract void destroyAndCollectBlocks(
       BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes);
 
-  /** Compute {@link ContentSummary}. */
+  /** Compute {@link ContentSummary}. Blocking call */
   public final ContentSummary computeContentSummary() {
-    final Content.Counts counts = computeContentSummary(
-        Content.Counts.newInstance());
+    return computeAndConvertContentSummary(
+        new ContentSummaryComputationContext());
+  }
+
+  /**
+   * Compute {@link ContentSummary}. 
+   */
+  public final ContentSummary computeAndConvertContentSummary(
+      ContentSummaryComputationContext summary) {
+    Content.Counts counts = computeContentSummary(summary).getCounts();
     return new ContentSummary(counts.get(Content.LENGTH),
         counts.get(Content.FILE) + counts.get(Content.SYMLINK),
         counts.get(Content.DIRECTORY), getNsQuota(),
@@ -384,10 +392,12 @@ public abstract class INode implements I
   /**
    * Count subtree content summary with a {@link Content.Counts}.
    *
-   * @param counts The subtree counts for returning.
-   * @return The same objects as the counts parameter.
+   * @param summary the context object holding counts for the subtree.
+   * @return The same objects as summary.
    */
-  public abstract Content.Counts computeContentSummary(Content.Counts counts);
+  public abstract ContentSummaryComputationContext computeContentSummary(
+      ContentSummaryComputationContext summary);
+
   
   /**
    * Check and add namespace/diskspace consumed to itself and the ancestors.

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Thu Nov 14 23:56:56 2013
@@ -466,12 +466,45 @@ public class INodeDirectory extends INod
   }
 
   @Override
-  public Content.Counts computeContentSummary(final Content.Counts counts) {
-    for (INode child : getChildrenList(null)) {
-      child.computeContentSummary(counts);
+  public ContentSummaryComputationContext computeContentSummary(
+      ContentSummaryComputationContext summary) {
+    ReadOnlyList<INode> childrenList = getChildrenList(null);
+    // Explicit traversing is done to enable repositioning after relinquishing
+    // and reacquiring locks.
+    for (int i = 0;  i < childrenList.size(); i++) {
+      INode child = childrenList.get(i);
+      byte[] childName = child.getLocalNameBytes();
+
+      long lastYieldCount = summary.getYieldCount();
+      child.computeContentSummary(summary);
+
+      // Check whether the computation was paused in the subtree.
+      // The counts may be off, but traversing the rest of children
+      // should be made safe.
+      if (lastYieldCount == summary.getYieldCount()) {
+        continue;
+      }
+
+      // The locks were released and reacquired. Check parent first.
+      if (getParent() == null) {
+        // Stop further counting and return whatever we have so far.
+        break;
+      }
+
+      // Obtain the children list again since it may have been modified.
+      childrenList = getChildrenList(null);
+      // Reposition in case the children list is changed. Decrement by 1
+      // since it will be incremented when loops.
+      i = nextChild(childrenList, childName) - 1;
     }
-    counts.add(Content.DIRECTORY, 1);
-    return counts;
+
+    // Increment the directory count for this directory.
+    summary.getCounts().add(Content.DIRECTORY, 1);
+
+    // Relinquish and reacquire locks if necessary.
+    summary.yield();
+
+    return summary;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Thu Nov 14 23:56:56 2013
@@ -107,12 +107,16 @@ public class INodeDirectoryWithQuota ext
   }
 
   @Override
-  public Content.Counts computeContentSummary(
-      final Content.Counts counts) {
-    final long original = counts.get(Content.DISKSPACE);
-    super.computeContentSummary(counts);
-    checkDiskspace(counts.get(Content.DISKSPACE) - original);
-    return counts;
+  public ContentSummaryComputationContext computeContentSummary(
+      final ContentSummaryComputationContext summary) {
+    final long original = summary.getCounts().get(Content.DISKSPACE);
+    long oldYieldCount = summary.getYieldCount();
+    super.computeContentSummary(summary);
+    // Check only when the content has not changed in the middle.
+    if (oldYieldCount == summary.getYieldCount()) {
+      checkDiskspace(summary.getCounts().get(Content.DISKSPACE) - original);
+    }
+    return summary;
   }
   
   private void checkDiskspace(final long computed) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Thu Nov 14 23:56:56 2013
@@ -342,11 +342,11 @@ public class INodeFile extends INodeWith
   }
 
   @Override
-  public final Content.Counts computeContentSummary(
-      final Content.Counts counts) {
-    computeContentSummary4Snapshot(counts);
-    computeContentSummary4Current(counts);
-    return counts;
+  public final ContentSummaryComputationContext  computeContentSummary(
+      final ContentSummaryComputationContext summary) {
+    computeContentSummary4Snapshot(summary.getCounts());
+    computeContentSummary4Current(summary.getCounts());
+    return summary;
   }
 
   private void computeContentSummary4Snapshot(final Content.Counts counts) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java Thu Nov 14 23:56:56 2013
@@ -107,7 +107,8 @@ public class INodeMap {
       }
       
       @Override
-      public Content.Counts computeContentSummary(Content.Counts counts) {
+      public ContentSummaryComputationContext computeContentSummary(
+          ContentSummaryComputationContext summary) {
         return null;
       }
       

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java Thu Nov 14 23:56:56 2013
@@ -278,8 +278,9 @@ public abstract class INodeReference ext
   }
 
   @Override
-  public Content.Counts computeContentSummary(Content.Counts counts) {
-    return referred.computeContentSummary(counts);
+  public ContentSummaryComputationContext computeContentSummary(
+      ContentSummaryComputationContext summary) {
+    return referred.computeContentSummary(summary);
   }
 
   @Override
@@ -444,12 +445,13 @@ public abstract class INodeReference ext
     }
     
     @Override
-    public final Content.Counts computeContentSummary(Content.Counts counts) {
+    public final ContentSummaryComputationContext computeContentSummary(
+        ContentSummaryComputationContext summary) {
       //only count diskspace for WithName
       final Quota.Counts q = Quota.Counts.newInstance();
       computeQuotaUsage(q, false, lastSnapshotId);
-      counts.add(Content.DISKSPACE, q.get(Quota.DISKSPACE));
-      return counts;
+      summary.getCounts().add(Content.DISKSPACE, q.get(Quota.DISKSPACE));
+      return summary;
     }
 
     @Override
@@ -688,4 +690,4 @@ public abstract class INodeReference ext
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java Thu Nov 14 23:56:56 2013
@@ -98,9 +98,10 @@ public class INodeSymlink extends INodeW
   }
 
   @Override
-  public Content.Counts computeContentSummary(final Content.Counts counts) {
-    counts.add(Content.SYMLINK, 1);
-    return counts;
+  public ContentSummaryComputationContext computeContentSummary(
+      final ContentSummaryComputationContext summary) {
+    summary.getCounts().add(Content.SYMLINK, 1);
+    return summary;
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Thu Nov 14 23:56:56 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.S
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.server.namenode.Content;
+import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -342,11 +343,12 @@ public class INodeDirectorySnapshottable
   }
   
   @Override
-  public Content.Counts computeContentSummary(final Content.Counts counts) {
-    super.computeContentSummary(counts);
-    counts.add(Content.SNAPSHOT, snapshotsByNames.size());
-    counts.add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
-    return counts;
+  public ContentSummaryComputationContext computeContentSummary(
+      final ContentSummaryComputationContext summary) {
+    super.computeContentSummary(summary);
+    summary.getCounts().add(Content.SNAPSHOT, snapshotsByNames.size());
+    summary.getCounts().add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
+    return summary;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java Thu Nov 14 23:56:56 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.Q
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.server.namenode.Content;
+import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
@@ -883,18 +884,27 @@ public class INodeDirectoryWithSnapshot 
   }
 
   @Override
-  public Content.Counts computeContentSummary(final Content.Counts counts) {
-    super.computeContentSummary(counts);
-    computeContentSummary4Snapshot(counts);
-    return counts;
+  public ContentSummaryComputationContext computeContentSummary(
+      final ContentSummaryComputationContext summary) {
+    // Snapshot summary calc won't be relinquishing locks in the middle.
+    // Do this first and handover to parent.
+    computeContentSummary4Snapshot(summary.getCounts());
+    super.computeContentSummary(summary);
+    return summary;
   }
 
   private void computeContentSummary4Snapshot(final Content.Counts counts) {
+    // Create a new blank summary context for blocking processing of subtree.
+    ContentSummaryComputationContext summary = 
+        new ContentSummaryComputationContext();
     for(DirectoryDiff d : diffs) {
       for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) {
-        deleted.computeContentSummary(counts);
+        deleted.computeContentSummary(summary);
       }
     }
+    // Add the counts from deleted trees.
+    counts.add(summary.getCounts());
+    // Add the deleted directory count.
     counts.add(Content.DIRECTORY, diffs.asList().size());
   }
   

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Thu Nov 14 23:56:56 2013
@@ -26,8 +26,8 @@ import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.net.URL;
-import java.net.URLConnection;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Date;
@@ -47,11 +47,14 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
 import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
 import org.apache.hadoop.hdfs.web.HftpFileSystem;
+import org.apache.hadoop.hdfs.web.HsftpFileSystem;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -142,11 +145,11 @@ public class DelegationTokenFetcher {
     // default to using the local file system
     FileSystem local = FileSystem.getLocal(conf);
     final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
+    final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
 
     // Login the current user
     UserGroupInformation.getCurrentUser().doAs(
         new PrivilegedExceptionAction<Object>() {
-          @SuppressWarnings("unchecked")
           @Override
           public Object run() throws Exception {
 
@@ -182,7 +185,8 @@ public class DelegationTokenFetcher {
             } else {
               // otherwise we are fetching
               if (webUrl != null) {
-                Credentials creds = getDTfromRemote(webUrl, renewer);
+                Credentials creds = getDTfromRemote(connectionFactory, new URI(webUrl),
+                    renewer);
                 creds.writeTokenStorageFile(tokenFile, conf);
                 for (Token<?> token : creds.getAllTokens()) {
                   if(LOG.isDebugEnabled()) {	
@@ -208,32 +212,31 @@ public class DelegationTokenFetcher {
         });
   }
   
-  static public Credentials getDTfromRemote(String nnAddr, 
-      String renewer) throws IOException {
+  static public Credentials getDTfromRemote(URLConnectionFactory factory,
+      URI nnUri, String renewer) throws IOException {
+    StringBuilder buf = new StringBuilder(nnUri.toString())
+        .append(GetDelegationTokenServlet.PATH_SPEC);
+    if (renewer != null) {
+      buf.append("?").append(GetDelegationTokenServlet.RENEWER).append("=")
+          .append(renewer);
+    }
+
+    HttpURLConnection conn = null;
     DataInputStream dis = null;
-    InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnAddr);
-    
+    InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnUri
+        .getAuthority());
+
     try {
-      StringBuffer url = new StringBuffer();
-      if (renewer != null) {
-        url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC)
-           .append("?").append(GetDelegationTokenServlet.RENEWER).append("=")
-           .append(renewer);
-      } else {
-        url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC);
-      }
-      
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Retrieving token from: " + url);
+        LOG.debug("Retrieving token from: " + buf);
       }
-      
-      URL remoteURL = new URL(url.toString());
-      URLConnection connection = SecurityUtil.openSecureHttpConnection(remoteURL);
-      InputStream in = connection.getInputStream();
+
+      conn = run(factory, new URL(buf.toString()));
+      InputStream in = conn.getInputStream();
       Credentials ts = new Credentials();
       dis = new DataInputStream(in);
       ts.readFields(dis);
-      for(Token<?> token: ts.getAllTokens()) {
+      for (Token<?> token : ts.getAllTokens()) {
         token.setKind(HftpFileSystem.TOKEN_KIND);
         SecurityUtil.setTokenService(token, serviceAddr);
       }
@@ -241,53 +244,70 @@ public class DelegationTokenFetcher {
     } catch (Exception e) {
       throw new IOException("Unable to obtain remote token", e);
     } finally {
-      if(dis != null) dis.close();
+      IOUtils.cleanup(LOG, dis);
+      if (conn != null) {
+        conn.disconnect();
+      }
     }
   }
 
   /**
+   * Cancel a Delegation Token.
+   * @param nnAddr the NameNode's address
+   * @param tok the token to cancel
+   * @throws IOException
+   * @throws AuthenticationException
+   */
+  static public void cancelDelegationToken(URLConnectionFactory factory,
+      URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
+      AuthenticationException {
+    StringBuilder buf = new StringBuilder(nnAddr.toString())
+        .append(CancelDelegationTokenServlet.PATH_SPEC).append("?")
+        .append(CancelDelegationTokenServlet.TOKEN).append("=")
+        .append(tok.encodeToUrlString());
+    HttpURLConnection conn = run(factory, new URL(buf.toString()));
+    conn.disconnect();
+  }
+
+  /**
    * Renew a Delegation Token.
    * @param nnAddr the NameNode's address
    * @param tok the token to renew
    * @return the Date that the token will expire next.
    * @throws IOException
+   * @throws AuthenticationException
    */
-  static public long renewDelegationToken(String nnAddr,
-      Token<DelegationTokenIdentifier> tok
-  ) throws IOException {
-    StringBuilder buf = new StringBuilder();
-    buf.append(nnAddr);
-    buf.append(RenewDelegationTokenServlet.PATH_SPEC);
-    buf.append("?");
-    buf.append(RenewDelegationTokenServlet.TOKEN);
-    buf.append("=");
-    buf.append(tok.encodeToUrlString());
-    BufferedReader in = null;
+  static public long renewDelegationToken(URLConnectionFactory factory,
+      URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
+      AuthenticationException {
+    StringBuilder buf = new StringBuilder(nnAddr.toString())
+        .append(RenewDelegationTokenServlet.PATH_SPEC).append("?")
+        .append(RenewDelegationTokenServlet.TOKEN).append("=")
+        .append(tok.encodeToUrlString());
+
     HttpURLConnection connection = null;
-    
+    BufferedReader in = null;
     try {
-      URL url = new URL(buf.toString());
-      connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url);
-      if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
-        throw new IOException("Error renewing token: " + 
-            connection.getResponseMessage());
-      }
-      in = new BufferedReader(
-          new InputStreamReader(connection.getInputStream(), Charsets.UTF_8));
+      connection = run(factory, new URL(buf.toString()));
+      in = new BufferedReader(new InputStreamReader(
+          connection.getInputStream(), Charsets.UTF_8));
       long result = Long.parseLong(in.readLine());
-      in.close();
       return result;
     } catch (IOException ie) {
       LOG.info("error in renew over HTTP", ie);
       IOException e = getExceptionFromResponse(connection);
 
-      IOUtils.cleanup(LOG, in);
-      if(e!=null) {
-        LOG.info("rethrowing exception from HTTP request: " + 
-                 e.getLocalizedMessage());
+      if (e != null) {
+        LOG.info("rethrowing exception from HTTP request: "
+            + e.getLocalizedMessage());
         throw e;
       }
       throw ie;
+    } finally {
+      IOUtils.cleanup(LOG, in);
+      if (connection != null) {
+        connection.disconnect();
+      }
     }
   }
 
@@ -339,43 +359,28 @@ public class DelegationTokenFetcher {
     return e;
   }
 
-  
-  /**
-   * Cancel a Delegation Token.
-   * @param nnAddr the NameNode's address
-   * @param tok the token to cancel
-   * @throws IOException
-   */
-  static public void cancelDelegationToken(String nnAddr,
-      Token<DelegationTokenIdentifier> tok
-  ) throws IOException {
-    StringBuilder buf = new StringBuilder();
-    buf.append(nnAddr);
-    buf.append(CancelDelegationTokenServlet.PATH_SPEC);
-    buf.append("?");
-    buf.append(CancelDelegationTokenServlet.TOKEN);
-    buf.append("=");
-    buf.append(tok.encodeToUrlString());
-    BufferedReader in = null;
-    HttpURLConnection connection=null;
+  private static HttpURLConnection run(URLConnectionFactory factory, URL url)
+      throws IOException, AuthenticationException {
+    HttpURLConnection conn = null;
+
     try {
-      URL url = new URL(buf.toString());
-      connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url);
-      if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
-        throw new IOException("Error cancelling token: " + 
-            connection.getResponseMessage());
+      conn = (HttpURLConnection) factory.openConnection(url, true);
+      if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+        String msg = conn.getResponseMessage();
+
+        throw new IOException("Error when dealing remote token: " + msg);
       }
     } catch (IOException ie) {
-      LOG.info("error in cancel over HTTP", ie);
-      IOException e = getExceptionFromResponse(connection);
+      LOG.info("Error when dealing remote token:", ie);
+      IOException e = getExceptionFromResponse(conn);
 
-      IOUtils.cleanup(LOG, in);
-      if(e!=null) {
-        LOG.info("rethrowing exception from HTTP request: " + 
-                 e.getLocalizedMessage());
+      if (e != null) {
+        LOG.info("rethrowing exception from HTTP request: "
+            + e.getLocalizedMessage());
         throw e;
       }
       throw ie;
     }
+    return conn;
   }
 }