You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [13/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Tue Aug 19 23:49:39 2014
@@ -98,7 +98,6 @@ public class BlockMetadataHeader {
 
   /**
    * This reads all the fields till the beginning of checksum.
-   * @param in 
    * @return Metadata Header
    * @throws IOException
    */
@@ -109,9 +108,7 @@ public class BlockMetadataHeader {
   /**
    * Reads header at the top of metadata file and returns the header.
    * 
-   * @param dataset
-   * @param block
-   * @return
+   * @return metadata header for the block
    * @throws IOException
    */
   public static BlockMetadataHeader readHeader(File file) throws IOException {
@@ -147,8 +144,6 @@ public class BlockMetadataHeader {
   /**
    * This writes all the fields till the beginning of checksum.
    * @param out DataOutputStream
-   * @param header 
-   * @return 
    * @throws IOException
    */
   @VisibleForTesting
@@ -161,9 +156,7 @@ public class BlockMetadataHeader {
   
   /**
    * Writes all the fields till the beginning of checksum.
-   * @param out
-   * @param checksum
-   * @throws IOException
+   * @throws IOException on error
    */
   static void writeHeader(DataOutputStream out, DataChecksum checksum)
                          throws IOException {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java Tue Aug 19 23:49:39 2014
@@ -88,7 +88,11 @@ class BlockPoolManager {
   
   synchronized void remove(BPOfferService t) {
     offerServices.remove(t);
-    bpByBlockPoolId.remove(t.getBlockPoolId());
+    if (t.hasBlockPoolId()) {
+      // It's possible that the block pool never successfully registered
+      // with any NN, so it was never added it to this map
+      bpByBlockPoolId.remove(t.getBlockPoolId());
+    }
     
     boolean removed = false;
     for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Tue Aug 19 23:49:39 2014
@@ -84,6 +84,10 @@ class BlockPoolSliceScanner {
   
   private final SortedSet<BlockScanInfo> blockInfoSet
       = new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
+
+  private final SortedSet<BlockScanInfo> newBlockInfoSet =
+      new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
+
   private final GSet<Block, BlockScanInfo> blockMap
       = new LightWeightGSet<Block, BlockScanInfo>(
           LightWeightGSet.computeCapacity(0.5, "BlockMap"));
@@ -97,7 +101,7 @@ class BlockPoolSliceScanner {
   private long totalTransientErrors = 0;
   private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
   
-  private long currentPeriodStart = Time.now();
+  private long currentPeriodStart = Time.monotonicNow();
   private long bytesLeft = 0; // Bytes to scan in this period
   private long totalBytesToScan = 0;
   private boolean isNewPeriod = true;
@@ -195,7 +199,7 @@ class BlockPoolSliceScanner {
       BlockScanInfo info = new BlockScanInfo( block );
       info.lastScanTime = scanTime--; 
       //still keep 'info.lastScanType' to NONE.
-      addBlockInfo(info);
+      addBlockInfo(info, false);
     }
 
     RollingLogs rollingLogs = null;
@@ -221,25 +225,42 @@ class BlockPoolSliceScanner {
     // Should we change throttler bandwidth every time bytesLeft changes?
     // not really required.
   }
-  
-  private synchronized void addBlockInfo(BlockScanInfo info) {
-    boolean added = blockInfoSet.add(info);
+
+  /**
+   * Add the BlockScanInfo to sorted set of blockScanInfo
+   * @param info BlockScanInfo to be added
+   * @param isNewBlock true if the block is the new Block, false if
+   *          BlockScanInfo is being updated with new scanTime
+   */
+  private synchronized void addBlockInfo(BlockScanInfo info,
+      boolean isNewBlock) {
+    boolean added = false;
+    if (isNewBlock) {
+      // check whether the block already present
+      boolean exists = blockInfoSet.contains(info);
+      added = !exists && newBlockInfoSet.add(info);
+    } else {
+      added = blockInfoSet.add(info);
+    }
     blockMap.put(info);
     
     if (added) {
       updateBytesToScan(info.getNumBytes(), info.lastScanTime);
     }
   }
-  
+
   private synchronized void delBlockInfo(BlockScanInfo info) {
     boolean exists = blockInfoSet.remove(info);
+    if (!exists){
+      exists = newBlockInfoSet.remove(info);
+    }
     blockMap.remove(info);
 
     if (exists) {
       updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
     }
   }
-  
+
   /** Update blockMap by the given LogEntry */
   private synchronized void updateBlockInfo(LogEntry e) {
     BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
@@ -249,7 +270,7 @@ class BlockPoolSliceScanner {
       delBlockInfo(info);
       info.lastScanTime = e.verificationTime;
       info.lastScanType = ScanType.VERIFICATION_SCAN;
-      addBlockInfo(info);
+      addBlockInfo(info, false);
     }
   }
 
@@ -260,7 +281,7 @@ class BlockPoolSliceScanner {
     long period = Math.min(scanPeriod, 
                            Math.max(blockMap.size(),1) * 600 * 1000L);
     int periodInt = Math.abs((int)period);
-    return Time.now() - scanPeriod + 
+    return Time.monotonicNow() - scanPeriod +
         DFSUtil.getRandom().nextInt(periodInt);
   }
 
@@ -275,14 +296,14 @@ class BlockPoolSliceScanner {
     info = new BlockScanInfo(block.getLocalBlock());    
     info.lastScanTime = getNewBlockScanTime();
     
-    addBlockInfo(info);
+    addBlockInfo(info, true);
     adjustThrottler();
   }
   
   /** Deletes the block from internal structures */
   synchronized void deleteBlock(Block block) {
     BlockScanInfo info = blockMap.get(block);
-    if ( info != null ) {
+    if (info != null) {
       delBlockInfo(info);
     }
   }
@@ -310,23 +331,16 @@ class BlockPoolSliceScanner {
     }
   }
   
-  private synchronized void updateScanStatus(Block block, 
+  private synchronized void updateScanStatus(BlockScanInfo info,
                                              ScanType type,
                                              boolean scanOk) {
-    BlockScanInfo info = blockMap.get(block);
-    
-    if ( info != null ) {
-      delBlockInfo(info);
-    } else {
-      // It might already be removed. Thats ok, it will be caught next time.
-      info = new BlockScanInfo(block);
-    }
-    
-    long now = Time.now();
+    delBlockInfo(info);
+
+    long now = Time.monotonicNow();
     info.lastScanType = type;
     info.lastScanTime = now;
     info.lastScanOk = scanOk;
-    addBlockInfo(info);
+    addBlockInfo(info, false);
         
     // Don't update meta data if the verification failed.
     if (!scanOk) {
@@ -334,8 +348,8 @@ class BlockPoolSliceScanner {
     }
     
     if (verificationLog != null) {
-      verificationLog.append(now, block.getGenerationStamp(),
-          block.getBlockId());
+      verificationLog.append(now, info.getGenerationStamp(),
+          info.getBlockId());
     }
   }
   
@@ -399,8 +413,9 @@ class BlockPoolSliceScanner {
   }
   
   private synchronized void adjustThrottler() {
-    long timeLeft = currentPeriodStart+scanPeriod - Time.now();
-    long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
+    long timeLeft = Math.max(1L,
+        currentPeriodStart + scanPeriod - Time.monotonicNow());
+    long bw = Math.max((bytesLeft * 1000) / timeLeft, MIN_SCAN_RATE);
     throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
   }
   
@@ -433,11 +448,13 @@ class BlockPoolSliceScanner {
           totalTransientErrors++;
         }
         
-        updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, true);
+        updateScanStatus((BlockScanInfo)block.getLocalBlock(),
+            ScanType.VERIFICATION_SCAN, true);
 
         return;
       } catch (IOException e) {
-        updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false);
+        updateScanStatus((BlockScanInfo)block.getLocalBlock(),
+            ScanType.VERIFICATION_SCAN, false);
 
         // If the block does not exists anymore, then its not an error
         if (!dataset.contains(block)) {
@@ -496,7 +513,7 @@ class BlockPoolSliceScanner {
   
   // Picks one block and verifies it
   private void verifyFirstBlock() {
-    Block block = null;
+    BlockScanInfo block = null;
     synchronized (this) {
       if (!blockInfoSet.isEmpty()) {
         block = blockInfoSet.first();
@@ -523,7 +540,7 @@ class BlockPoolSliceScanner {
   private boolean assignInitialVerificationTimes() {
     //First updates the last verification times from the log file.
     if (verificationLog != null) {
-      long now = Time.now();
+      long now = Time.monotonicNow();
       RollingLogs.LineIterator logIterator = null;
       try {
         logIterator = verificationLog.logs.iterator(false);
@@ -574,7 +591,7 @@ class BlockPoolSliceScanner {
       // Initially spread the block reads over half of scan period
       // so that we don't keep scanning the blocks too quickly when restarted.
       long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
-      long lastScanTime = Time.now() - scanPeriod;
+      long lastScanTime = Time.monotonicNow() - scanPeriod;
 
       if (!blockInfoSet.isEmpty()) {
         BlockScanInfo info;
@@ -582,7 +599,7 @@ class BlockPoolSliceScanner {
           delBlockInfo(info);        
           info.lastScanTime = lastScanTime;
           lastScanTime += verifyInterval;
-          addBlockInfo(info);
+          addBlockInfo(info, false);
         }
       }
     }
@@ -601,16 +618,16 @@ class BlockPoolSliceScanner {
 
     // reset the byte counts :
     bytesLeft = totalBytesToScan;
-    currentPeriodStart = Time.now();
+    currentPeriodStart = Time.monotonicNow();
     isNewPeriod = true;
   }
   
   private synchronized boolean workRemainingInCurrentPeriod() {
-    if (bytesLeft <= 0 && Time.now() < currentPeriodStart + scanPeriod) {
+    if (bytesLeft <= 0 && Time.monotonicNow() < currentPeriodStart + scanPeriod) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
                   currentPeriodStart + ", period=" + scanPeriod + ", now=" +
-                  Time.now() + " " + blockPoolId);
+                  Time.monotonicNow() + " " + blockPoolId);
       }
       return false;
     } else {
@@ -633,7 +650,7 @@ class BlockPoolSliceScanner {
       scan();
     } finally {
       totalBlocksScannedInLastRun.set(processedBlocks.size());
-      lastScanTime.set(Time.now());
+      lastScanTime.set(Time.monotonicNow());
     }
   }
 
@@ -656,7 +673,7 @@ class BlockPoolSliceScanner {
       while (datanode.shouldRun
           && !datanode.blockScanner.blockScannerThread.isInterrupted()
           && datanode.isBPServiceAlive(blockPoolId)) {
-        long now = Time.now();
+        long now = Time.monotonicNow();
         synchronized (this) {
           if ( now >= (currentPeriodStart + scanPeriod)) {
             startNewPeriod();
@@ -678,12 +695,21 @@ class BlockPoolSliceScanner {
       throw e;
     } finally {
       rollVerificationLogs();
+      rollNewBlocksInfo();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Done scanning block pool: " + blockPoolId);
       }
     }
   }
-  
+
+  // add new blocks to scan in next iteration
+  private synchronized void rollNewBlocksInfo() {
+    for (BlockScanInfo newBlock : newBlockInfoSet) {
+      blockInfoSet.add(newBlock);
+    }
+    newBlockInfoSet.clear();
+  }
+
   private synchronized void rollVerificationLogs() {
     if (verificationLog != null) {
       try {
@@ -714,7 +740,7 @@ class BlockPoolSliceScanner {
     
     int total = blockInfoSet.size();
     
-    long now = Time.now();
+    long now = Time.monotonicNow();
     
     Date date = new Date();
     

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Tue Aug 19 23:49:39 2014
@@ -36,8 +36,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -106,13 +108,22 @@ public class BlockPoolSliceStorage exten
   void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
       Collection<File> dataDirs, StartupOption startOpt) throws IOException {
     LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
+    Set<String> existingStorageDirs = new HashSet<String>();
+    for (int i = 0; i < getNumStorageDirs(); i++) {
+      existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+    }
+
     // 1. For each BP data directory analyze the state and
     // check whether all is consistent before transitioning.
-    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
     ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(
         dataDirs.size());
     for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
       File dataDir = it.next();
+      if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+        LOG.info("Storage directory " + dataDir + " has already been used.");
+        it.remove();
+        continue;
+      }
       StorageDirectory sd = new StorageDirectory(dataDir, null, true);
       StorageState curState;
       try {
@@ -152,7 +163,7 @@ public class BlockPoolSliceStorage exten
     // During startup some of them can upgrade or roll back
     // while others could be up-to-date for the regular startup.
     for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-      doTransition(getStorageDir(idx), nsInfo, startOpt);
+      doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
       assert getCTime() == nsInfo.getCTime() 
           : "Data-node and name-node CTimes must be the same.";
     }
@@ -242,7 +253,7 @@ public class BlockPoolSliceStorage exten
    * @param startOpt startup option
    * @throws IOException
    */
-  private void doTransition(StorageDirectory sd,
+  private void doTransition(DataNode datanode, StorageDirectory sd,
       NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
     if (startOpt == StartupOption.ROLLBACK) {
       doRollback(sd, nsInfo); // rollback if applicable
@@ -275,7 +286,7 @@ public class BlockPoolSliceStorage exten
     }
     if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
-      doUpgrade(sd, nsInfo); // upgrade
+      doUpgrade(datanode, sd, nsInfo); // upgrade
       return;
     }
     // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
@@ -304,7 +315,8 @@ public class BlockPoolSliceStorage exten
    * @param nsInfo Namespace Info from the namenode
    * @throws IOException on error
    */
-  void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException {
+  void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo)
+      throws IOException {
     // Upgrading is applicable only to release with federation or after
     if (!DataNodeLayoutVersion.supports(
         LayoutVersion.Feature.FEDERATION, layoutVersion)) {
@@ -312,7 +324,7 @@ public class BlockPoolSliceStorage exten
     }
     LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
         + ".\n   old LV = " + this.getLayoutVersion() + "; old CTime = "
-        + this.getCTime() + ".\n   new LV = " + nsInfo.getLayoutVersion()
+        + this.getCTime() + ".\n   new LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION
         + "; new CTime = " + nsInfo.getCTime());
     // get <SD>/previous directory
     String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath());
@@ -340,7 +352,7 @@ public class BlockPoolSliceStorage exten
     rename(bpCurDir, bpTmpDir);
     
     // 3. Create new <SD>/current with block files hardlinks and VERSION
-    linkAllBlocks(bpTmpDir, bpCurDir);
+    linkAllBlocks(datanode, bpTmpDir, bpCurDir);
     this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
     assert this.namespaceID == nsInfo.getNamespaceID() 
         : "Data-node and name-node layout versions must be the same.";
@@ -517,14 +529,15 @@ public class BlockPoolSliceStorage exten
    * @param toDir the current data directory
    * @throws IOException if error occurs during hardlink
    */
-  private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+  private void linkAllBlocks(DataNode datanode, File fromDir, File toDir)
+      throws IOException {
     // do the link
     int diskLayoutVersion = this.getLayoutVersion();
     // hardlink finalized blocks in tmpDir
     HardLink hardLink = new HardLink();
-    DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED), 
+    DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
       new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
-    DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_RBW), 
+    DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW),
         new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
     LOG.info( hardLink.linkStats.report() );
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Aug 19 23:49:39 2014
@@ -37,6 +37,7 @@ import java.util.zip.Checksum;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -69,7 +71,7 @@ class BlockReceiver implements Closeable
 
   @VisibleForTesting
   static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
-  
+  private final long datanodeSlowLogThresholdMs;
   private DataInputStream in = null; // from where data are read
   private DataChecksum clientChecksum; // checksum used by client
   private DataChecksum diskChecksum; // checksum we write to disk
@@ -86,8 +88,7 @@ class BlockReceiver implements Closeable
   private int bytesPerChecksum;
   private int checksumSize;
   
-  private final PacketReceiver packetReceiver =
-      new PacketReceiver(false);
+  private final PacketReceiver packetReceiver = new PacketReceiver(false);
   
   protected final String inAddr;
   protected final String myAddr;
@@ -105,6 +106,7 @@ class BlockReceiver implements Closeable
   private boolean dropCacheBehindWrites;
   private long lastCacheManagementOffset = 0;
   private boolean syncBehindWrites;
+  private boolean syncBehindWritesInBackground;
 
   /** The client name.  It is empty if a datanode is the client */
   private final String clientname;
@@ -122,7 +124,16 @@ class BlockReceiver implements Closeable
   private boolean syncOnClose;
   private long restartBudget;
 
-  BlockReceiver(final ExtendedBlock block, final DataInputStream in,
+  /**
+   * for replaceBlock response
+   */
+  private final long responseInterval;
+  private long lastResponseTime = 0;
+  private boolean isReplaceBlock = false;
+  private DataOutputStream replyOut = null;
+
+  BlockReceiver(final ExtendedBlock block, final StorageType storageType,
+      final DataInputStream in,
       final String inAddr, final String myAddr,
       final BlockConstructionStage stage, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
@@ -141,7 +152,10 @@ class BlockReceiver implements Closeable
       this.isDatanode = clientname.length() == 0;
       this.isClient = !this.isDatanode;
       this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
-
+      this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
+      // For replaceBlock() calls response should be sent to avoid socketTimeout
+      // at clients. So sending with the interval of 0.5 * socketTimeout
+      this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
       //for datanode, we have
       //1: clientName.length() == 0, and
       //2: stage == null or PIPELINE_SETUP_CREATE
@@ -162,11 +176,11 @@ class BlockReceiver implements Closeable
       // Open local disk out
       //
       if (isDatanode) { //replication or move
-        replicaInfo = datanode.data.createTemporary(block);
+        replicaInfo = datanode.data.createTemporary(storageType, block);
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaInfo = datanode.data.createRbw(block);
+          replicaInfo = datanode.data.createRbw(storageType, block);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaInfo.getStorageUuid());
           break;
@@ -198,7 +212,7 @@ class BlockReceiver implements Closeable
         case TRANSFER_RBW:
         case TRANSFER_FINALIZED:
           // this is a transfer destination
-          replicaInfo = datanode.data.createTemporary(block);
+          replicaInfo = datanode.data.createTemporary(storageType, block);
           break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);
@@ -208,6 +222,8 @@ class BlockReceiver implements Closeable
         datanode.getDnConf().dropCacheBehindWrites :
           cachingStrategy.getDropBehind();
       this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
+      this.syncBehindWritesInBackground = datanode.getDnConf().
+          syncBehindWritesInBackground;
       
       final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
@@ -249,7 +265,7 @@ class BlockReceiver implements Closeable
       
       if (cause != null) { // possible disk error
         ioe = cause;
-        datanode.checkDiskError(ioe); // may throw an exception here
+        datanode.checkDiskErrorAsync();
       }
       
       throw ioe;
@@ -268,10 +284,8 @@ class BlockReceiver implements Closeable
    */
   @Override
   public void close() throws IOException {
-    if (packetReceiver != null) {
-      packetReceiver.close();
-    }
-    
+    packetReceiver.close();
+
     IOException ioe = null;
     if (syncOnClose && (out != null || checksumOut != null)) {
       datanode.metrics.incrFsyncCount();      
@@ -327,7 +341,7 @@ class BlockReceiver implements Closeable
     }
     // disk check
     if(ioe != null) {
-      datanode.checkDiskError(ioe);
+      datanode.checkDiskErrorAsync();
       throw ioe;
     }
   }
@@ -338,6 +352,7 @@ class BlockReceiver implements Closeable
    */
   void flushOrSync(boolean isSync) throws IOException {
     long flushTotalNanos = 0;
+    long begin = Time.monotonicNow();
     if (checksumOut != null) {
       long flushStartNanos = System.nanoTime();
       checksumOut.flush();
@@ -366,6 +381,12 @@ class BlockReceiver implements Closeable
     	  datanode.metrics.incrFsyncCount();      
       }
     }
+    long duration = Time.monotonicNow() - begin;
+    if (duration > datanodeSlowLogThresholdMs) {
+      LOG.warn("Slow flushOrSync took " + duration + "ms (threshold="
+          + datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos="
+          + flushTotalNanos + "ns");
+    }
   }
 
   /**
@@ -491,8 +512,14 @@ class BlockReceiver implements Closeable
     //First write the packet to the mirror:
     if (mirrorOut != null && !mirrorError) {
       try {
+        long begin = Time.monotonicNow();
         packetReceiver.mirrorPacketTo(mirrorOut);
         mirrorOut.flush();
+        long duration = Time.monotonicNow() - begin;
+        if (duration > datanodeSlowLogThresholdMs) {
+          LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+              + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
+        }
       } catch (IOException e) {
         handleMirrorOutError(e);
       }
@@ -575,7 +602,13 @@ class BlockReceiver implements Closeable
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
           
           // Write data to disk.
+          long begin = Time.monotonicNow();
           out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
+          long duration = Time.monotonicNow() - begin;
+          if (duration > datanodeSlowLogThresholdMs) {
+            LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
+                + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
+          }
 
           // If this is a partial chunk, then verify that this is the only
           // chunk in the packet. Calculate new crc for this chunk.
@@ -618,7 +651,7 @@ class BlockReceiver implements Closeable
           manageWriterOsCache(offsetInBlock);
         }
       } catch (IOException iex) {
-        datanode.checkDiskError(iex);
+        datanode.checkDiskErrorAsync();
         throw iex;
       }
     }
@@ -630,6 +663,20 @@ class BlockReceiver implements Closeable
           lastPacketInBlock, offsetInBlock, Status.SUCCESS);
     }
 
+    /*
+     * Send in-progress responses for the replaceBlock() calls back to caller to
+     * avoid timeouts due to balancer throttling. HDFS-6247
+     */
+    if (isReplaceBlock
+        && (Time.monotonicNow() - lastResponseTime > responseInterval)) {
+      BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
+          .setStatus(Status.IN_PROGRESS);
+      response.build().writeDelimitedTo(replyOut);
+      replyOut.flush();
+
+      lastResponseTime = Time.monotonicNow();
+    }
+
     if (throttler != null) { // throttle I/O
       throttler.throttle(len);
     }
@@ -641,6 +688,7 @@ class BlockReceiver implements Closeable
     try {
       if (outFd != null &&
           offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
+        long begin = Time.monotonicNow();
         //
         // For SYNC_FILE_RANGE_WRITE, we want to sync from
         // lastCacheManagementOffset to a position "two windows ago"
@@ -651,10 +699,17 @@ class BlockReceiver implements Closeable
         // of file                 
         //
         if (syncBehindWrites) {
-          NativeIO.POSIX.syncFileRangeIfPossible(outFd,
-              lastCacheManagementOffset,
-              offsetInBlock - lastCacheManagementOffset,
-              NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
+          if (syncBehindWritesInBackground) {
+            this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
+                block, outFd, lastCacheManagementOffset,
+                offsetInBlock - lastCacheManagementOffset,
+                NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
+          } else {
+            NativeIO.POSIX.syncFileRangeIfPossible(outFd,
+                lastCacheManagementOffset, offsetInBlock
+                    - lastCacheManagementOffset,
+                NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
+          }
         }
         //
         // For POSIX_FADV_DONTNEED, we want to drop from the beginning 
@@ -673,18 +728,29 @@ class BlockReceiver implements Closeable
               NativeIO.POSIX.POSIX_FADV_DONTNEED);
         }
         lastCacheManagementOffset = offsetInBlock;
+        long duration = Time.monotonicNow() - begin;
+        if (duration > datanodeSlowLogThresholdMs) {
+          LOG.warn("Slow manageWriterOsCache took " + duration
+              + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
+        }
       }
     } catch (Throwable t) {
       LOG.warn("Error managing cache for writer of block " + block, t);
     }
   }
-
+  
+  public void sendOOB() throws IOException, InterruptedException {
+    ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
+        .getRestartOOBStatus());
+  }
+  
   void receiveBlock(
       DataOutputStream mirrOut, // output to next datanode
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
       String mirrAddr, DataTransferThrottler throttlerArg,
-      DatanodeInfo[] downstreams) throws IOException {
+      DatanodeInfo[] downstreams,
+      boolean isReplaceBlock) throws IOException {
 
       syncOnClose = datanode.getDnConf().syncOnClose;
       boolean responderClosed = false;
@@ -692,6 +758,9 @@ class BlockReceiver implements Closeable
       mirrorAddr = mirrAddr;
       throttler = throttlerArg;
 
+      this.replyOut = replyOut;
+      this.isReplaceBlock = isReplaceBlock;
+
     try {
       if (isClient && !isTransfer) {
         responder = new Daemon(datanode.threadGroup, 
@@ -766,9 +835,7 @@ class BlockReceiver implements Closeable
               // The worst case is not recovering this RBW replica. 
               // Client will fall back to regular pipeline recovery.
             }
-            try {
-              ((PacketResponder) responder.getRunnable()).
-                  sendOOBResponse(PipelineAck.getRestartOOBStatus());
+            try {              
               // Even if the connection is closed after the ack packet is
               // flushed, the client can react to the connection closure 
               // first. Insert a delay to lower the chance of client 
@@ -776,8 +843,6 @@ class BlockReceiver implements Closeable
               Thread.sleep(1000);
             } catch (InterruptedException ie) {
               // It is already going down. Ignore this.
-            } catch (IOException ioe) {
-              LOG.info("Error sending OOB Ack.", ioe);
             }
           }
           responder.interrupt();
@@ -956,9 +1021,9 @@ class BlockReceiver implements Closeable
     
     /**
      * enqueue the seqno that is still be to acked by the downstream datanode.
-     * @param seqno
-     * @param lastPacketInBlock
-     * @param offsetInBlock
+     * @param seqno sequence number of the packet
+     * @param lastPacketInBlock if true, this is the last packet in block
+     * @param offsetInBlock offset of this packet in block
      */
     void enqueue(final long seqno, final boolean lastPacketInBlock,
         final long offsetInBlock, final Status ackStatus) {
@@ -1174,11 +1239,7 @@ class BlockReceiver implements Closeable
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
-            try {
-              datanode.checkDiskError(e); // may throw an exception here
-            } catch (IOException ioe) {
-              LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
-            }
+            datanode.checkDiskErrorAsync();
             LOG.info(myString, e);
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption
@@ -1306,9 +1367,15 @@ class BlockReceiver implements Closeable
         replicaInfo.setBytesAcked(offsetInBlock);
       }
       // send my ack back to upstream datanode
+      long begin = Time.monotonicNow();
       replyAck.write(upstreamOut);
       upstreamOut.flush();
-      if (LOG.isDebugEnabled()) {
+      long duration = Time.monotonicNow() - begin;
+      if (duration > datanodeSlowLogThresholdMs) {
+        LOG.warn("Slow PacketResponder send ack to upstream took " + duration
+            + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
+            + ", replyAck=" + replyAck);
+      } else if (LOG.isDebugEnabled()) {
         LOG.debug(myString + ", replyAck=" + replyAck);
       }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Aug 19 23:49:39 2014
@@ -168,7 +168,7 @@ class BlockSender implements java.io.Clo
    * @param block Block that is being read
    * @param startOffset starting offset to read from
    * @param length length of data to read
-   * @param corruptChecksumOk
+   * @param corruptChecksumOk if true, corrupt checksum is okay
    * @param verifyChecksum verify checksum while reading the data
    * @param sendChecksum send checksum to client.
    * @param datanode datanode from which the block is being read
@@ -687,7 +687,7 @@ class BlockSender implements java.io.Clo
     // Trigger readahead of beginning of file if configured.
     manageOsCache();
 
-    final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+    final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
     try {
       int maxChunksPerPacket;
       int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
@@ -733,9 +733,9 @@ class BlockSender implements java.io.Clo
         sentEntireByteRange = true;
       }
     } finally {
-      if (clientTraceFmt != null) {
+      if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
         final long endTime = System.nanoTime();
-        ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
+        ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
             initialOffset, endTime - startTime));
       }
       close();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Tue Aug 19 23:49:39 2014
@@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFS
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.security.SaslPropertiesResolver;
 
 /**
  * Simple class encapsulating all of the configuration that the DataNode
@@ -67,6 +69,7 @@ public class DNConf {
   final boolean transferToAllowed;
   final boolean dropCacheBehindWrites;
   final boolean syncBehindWrites;
+  final boolean syncBehindWritesInBackground;
   final boolean dropCacheBehindReads;
   final boolean syncOnClose;
   final boolean encryptDataTransfer;
@@ -79,10 +82,13 @@ public class DNConf {
   final long deleteReportInterval;
   final long initialBlockReportDelay;
   final long cacheReportInterval;
+  final long dfsclientSlowIoWarningThresholdMs;
+  final long datanodeSlowIoWarningThresholdMs;
   final int writePacketSize;
   
   final String minimumNameNodeVersion;
   final String encryptionAlgorithm;
+  final SaslPropertiesResolver saslPropsResolver;
   final TrustedChannelResolver trustedChannelResolver;
   
   final long xceiverStopTimeout;
@@ -117,6 +123,9 @@ public class DNConf {
     syncBehindWrites = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
         DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+    syncBehindWritesInBackground = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY,
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT);
     dropCacheBehindReads = conf.getBoolean(
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
         DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
@@ -129,7 +138,14 @@ public class DNConf {
                                             DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
     this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
         DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
-    
+
+    this.dfsclientSlowIoWarningThresholdMs = conf.getLong(
+        DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
+        DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
+    this.datanodeSlowIoWarningThresholdMs = conf.getLong(
+        DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
+        DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
+
     long initBRDelay = conf.getLong(
         DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
         DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L;
@@ -155,6 +171,8 @@ public class DNConf {
         DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
     this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
     this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
+    this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
+      conf);
     
     this.xceiverStopTimeout = conf.getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -168,12 +186,31 @@ public class DNConf {
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
   }
-  
+
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
   String getMinimumNameNodeVersion() {
     return this.minimumNameNodeVersion;
   }
-  
+
+  /**
+   * Returns true if encryption enabled for DataTransferProtocol.
+   *
+   * @return boolean true if encryption enabled for DataTransferProtocol
+   */
+  public boolean getEncryptDataTransfer() {
+    return encryptDataTransfer;
+  }
+
+  /**
+   * Returns encryption algorithm configured for DataTransferProtocol, or null
+   * if not configured.
+   *
+   * @return encryption algorithm configured for DataTransferProtocol
+   */
+  public String getEncryptionAlgorithm() {
+    return encryptionAlgorithm;
+  }
+
   public long getXceiverStopTimeout() {
     return xceiverStopTimeout;
   }
@@ -181,4 +218,24 @@ public class DNConf {
   public long getMaxLockedMemory() {
     return maxLockedMemory;
   }
+
+  /**
+   * Returns the SaslPropertiesResolver configured for use with
+   * DataTransferProtocol, or null if not configured.
+   *
+   * @return SaslPropertiesResolver configured for use with DataTransferProtocol
+   */
+  public SaslPropertiesResolver getSaslPropsResolver() {
+    return saslPropsResolver;
+  }
+
+  /**
+   * Returns the TrustedChannelResolver configured for use with
+   * DataTransferProtocol, or null if not configured.
+   *
+   * @return TrustedChannelResolver configured for use with DataTransferProtocol
+   */
+  public TrustedChannelResolver getTrustedChannelResolver() {
+    return trustedChannelResolver;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Aug 19 23:49:39 2014
@@ -131,8 +131,7 @@ public class DataBlockScanner implements
   private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
     
     String nextBpId = null;
-    while ((nextBpId == null) && datanode.shouldRun
-        && !blockScannerThread.isInterrupted()) {
+    while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {
       waitForInit();
       synchronized (this) {
         if (getBlockPoolSetSize() > 0) {          

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Aug 19 23:49:39 2014
@@ -17,10 +17,68 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.ObjectName;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,17 +94,44 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.net.DomainPeerServer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.*;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -59,7 +144,11 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpConfig;
@@ -82,25 +171,21 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.ServicePlugin;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
-import javax.management.ObjectName;
-
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.net.*;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.util.ExitUtil.terminate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -185,6 +270,7 @@ public class DataNode extends Configured
   public final static String EMPTY_DEL_HINT = "";
   final AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
+  DataXceiverServer xserver = null;
   Daemon localDataXceiverServer = null;
   ShortCircuitRegistry shortCircuitRegistry = null;
   ThreadGroup threadGroup = null;
@@ -227,8 +313,18 @@ public class DataNode extends Configured
   private final List<String> usersWithLocalPathAccess;
   private final boolean connectToDnViaHostname;
   ReadaheadPool readaheadPool;
+  SaslDataTransferClient saslClient;
+  SaslDataTransferServer saslServer;
   private final boolean getHdfsBlockLocationsEnabled;
   private ObjectName dataNodeInfoBeanName;
+  private Thread checkDiskErrorThread = null;
+  protected final int checkDiskErrorInterval = 5*1000;
+  private boolean checkDiskErrorFlag = false;
+  private Object checkDiskErrorMutex = new Object();
+  private long lastDiskErrorCheck;
+  private String supergroup;
+  private boolean isPermissionEnabled;
+  private String dnUserName = null;
 
   /**
    * Create the DataNode given a configuration, an array of dataDirs,
@@ -238,6 +334,7 @@ public class DataNode extends Configured
            final List<StorageLocation> dataDirs,
            final SecureResources resources) throws IOException {
     super(conf);
+    this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
         DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
 
@@ -249,6 +346,11 @@ public class DataNode extends Configured
     this.getHdfsBlockLocationsEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+    this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
+        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
+    this.isPermissionEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
+        DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
 
     confVersion = "core-" +
         conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
@@ -429,6 +531,33 @@ public class DataNode extends Configured
       ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
     }
   }
+
+  /** Check whether the current user is in the superuser group. */
+  private void checkSuperuserPrivilege() throws IOException, AccessControlException {
+    if (!isPermissionEnabled) {
+      return;
+    }
+    // Try to get the ugi in the RPC call.
+    UserGroupInformation callerUgi = ipcServer.getRemoteUser();
+    if (callerUgi == null) {
+      // This is not from RPC.
+      callerUgi = UserGroupInformation.getCurrentUser();
+    }
+
+    // Is this by the DN user itself?
+    assert dnUserName != null;
+    if (callerUgi.getShortUserName().equals(dnUserName)) {
+      return;
+    }
+
+    // Is the user a member of the super group?
+    List<String> groups = Arrays.asList(callerUgi.getGroupNames());
+    if (groups.contains(supergroup)) {
+      return;
+    }
+    // Not a superuser.
+    throw new AccessControlException();
+  }
   
 /**
  * Initialize the datanode's periodic scanners:
@@ -521,8 +650,8 @@ public class DataNode extends Configured
     streamingAddr = tcpPeerServer.getStreamingAddr();
     LOG.info("Opened streaming server at " + streamingAddr);
     this.threadGroup = new ThreadGroup("dataXceiverServer");
-    this.dataXceiverServer = new Daemon(threadGroup, 
-        new DataXceiverServer(tcpPeerServer, conf, this));
+    xserver = new DataXceiverServer(tcpPeerServer, conf, this);
+    this.dataXceiverServer = new Daemon(threadGroup, xserver);
     this.threadGroup.setDaemon(true); // auto destroy when empty
 
     if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
@@ -646,7 +775,6 @@ public class DataNode extends Configured
   
   /**
    * Return the BPOfferService instance corresponding to the given block.
-   * @param block
    * @return the BPOS
    * @throws IOException if no such BPOS can be found
    */
@@ -685,15 +813,10 @@ public class DataNode extends Configured
    */
   void startDataNode(Configuration conf, 
                      List<StorageLocation> dataDirs,
-                    // DatanodeProtocol namenode,
                      SecureResources resources
                      ) throws IOException {
-    if(UserGroupInformation.isSecurityEnabled() && resources == null) {
-      if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) {
-        throw new RuntimeException("Cannot start secure cluster without "
-            + "privileged resources.");
-      }
-    }
+
+    checkSecureConfig(conf, resources);
 
     // settings global for all BPs in the Data Node
     this.secureResources = resources;
@@ -708,15 +831,19 @@ public class DataNode extends Configured
             " size (%s) is greater than zero and native code is not available.",
             DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
       }
-      long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
-      if (dnConf.maxLockedMemory > ulimit) {
-      throw new RuntimeException(String.format(
-          "Cannot start datanode because the configured max locked memory" +
-          " size (%s) of %d bytes is more than the datanode's available" +
-          " RLIMIT_MEMLOCK ulimit of %d bytes.",
-          DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
-          dnConf.maxLockedMemory,
-          ulimit));
+      if (Path.WINDOWS) {
+        NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
+      } else {
+        long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
+        if (dnConf.maxLockedMemory > ulimit) {
+          throw new RuntimeException(String.format(
+            "Cannot start datanode because the configured max locked memory" +
+            " size (%s) of %d bytes is more than the datanode's available" +
+            " RLIMIT_MEMLOCK ulimit of %d bytes.",
+            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+            dnConf.maxLockedMemory,
+            ulimit));
+        }
       }
     }
     LOG.info("Starting DataNode with maxLockedMemory = " +
@@ -733,16 +860,71 @@ public class DataNode extends Configured
   
     // BlockPoolTokenSecretManager is required to create ipc server.
     this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
+
+    // Login is done by now. Set the DN user name.
+    dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
+    LOG.info("dnUserName = " + dnUserName);
+    LOG.info("supergroup = " + supergroup);
     initIpcServer(conf);
 
     metrics = DataNodeMetrics.create(conf, getDisplayName());
-
+    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
+    
     blockPoolManager = new BlockPoolManager(this);
     blockPoolManager.refreshNamenodes(conf);
 
     // Create the ReadaheadPool from the DataNode context so we can
     // exit without having to explicitly shutdown its thread pool.
     readaheadPool = ReadaheadPool.getInstance();
+    saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
+      dnConf.trustedChannelResolver,
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+    saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
+  }
+
+  /**
+   * Checks if the DataNode has a secure configuration if security is enabled.
+   * There are 2 possible configurations that are considered secure:
+   * 1. The server has bound to privileged ports for RPC and HTTP via
+   *   SecureDataNodeStarter.
+   * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no
+   *   plain HTTP) for the HTTP server.  The SASL handshake guarantees
+   *   authentication of the RPC server before a client transmits a secret, such
+   *   as a block access token.  Similarly, SSL guarantees authentication of the
+   *   HTTP server before a client transmits a secret, such as a delegation
+   *   token.
+   * It is not possible to run with both privileged ports and SASL on
+   * DataTransferProtocol.  For backwards-compatibility, the connection logic
+   * must check if the target port is a privileged port, and if so, skip the
+   * SASL handshake.
+   *
+   * @param conf Configuration to check
+   * @param resources SecuredResources obtained for DataNode
+   * @throws RuntimeException if security enabled, but configuration is insecure
+   */
+  private static void checkSecureConfig(Configuration conf,
+      SecureResources resources) throws RuntimeException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+    if (resources != null && dataTransferProtection == null) {
+      return;
+    }
+    if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+      return;
+    }
+    if (dataTransferProtection != null &&
+        DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
+        resources == null) {
+      return;
+    }
+    throw new RuntimeException("Cannot start secure DataNode without " +
+      "configuring either privileged resources or SASL RPC data transfer " +
+      "protection and SSL for HTTP.  Using privileged resources in " +
+      "combination with SASL RPC data transfer protection is not supported.");
   }
   
   public static String generateUuid() {
@@ -811,9 +993,7 @@ public class DataNode extends Configured
   /**
    * After the block pool has contacted the NN, registers that block pool
    * with the secret manager, updating it with the secrets provided by the NN.
-   * @param bpRegistration
-   * @param blockPoolId
-   * @throws IOException
+   * @throws IOException on error
    */
   private synchronized void registerBlockPoolWithSecretManager(
       DatanodeRegistration bpRegistration, String blockPoolId) throws IOException {
@@ -850,19 +1030,24 @@ public class DataNode extends Configured
    */
   void shutdownBlockPool(BPOfferService bpos) {
     blockPoolManager.remove(bpos);
+    if (bpos.hasBlockPoolId()) {
+      // Possible that this is shutting down before successfully
+      // registering anywhere. If that's the case, we wouldn't have
+      // a block pool id
+      String bpId = bpos.getBlockPoolId();
+      if (blockScanner != null) {
+        blockScanner.removeBlockPool(bpId);
+      }
 
-    String bpId = bpos.getBlockPoolId();
-    if (blockScanner != null) {
-      blockScanner.removeBlockPool(bpId);
-    }
-  
-    if (data != null) { 
-      data.shutdownBlockPool(bpId);
-    }
+      if (data != null) {
+        data.shutdownBlockPool(bpId);
+      }
 
-    if (storage != null) {
-      storage.removeBlockPoolStorage(bpId);
+      if (storage != null) {
+        storage.removeBlockPoolStorage(bpId);
+      }
     }
+
   }
 
   /**
@@ -883,14 +1068,19 @@ public class DataNode extends Configured
           + " should have retrieved namespace info before initBlockPool.");
     }
     
+    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
+
     // Register the new block pool with the BP manager.
     blockPoolManager.addBlockPool(bpos);
-
-    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
     
     // In the case that this is the first block pool to connect, initialize
     // the dataset, block scanners, etc.
     initStorage(nsInfo);
+
+    // Exclude failed disks before initializing the block pools to avoid startup
+    // failures.
+    checkDiskError();
+
     initPeriodicScanners(conf);
     
     data.addBlockPool(nsInfo.getBlockPoolID(), conf);
@@ -949,6 +1139,11 @@ public class DataNode extends Configured
   }
   
   @VisibleForTesting
+  public DataXceiverServer getXferServer() {
+    return xserver;  
+  }
+  
+  @VisibleForTesting
   public int getXferPort() {
     return streamingAddr.getPort();
   }
@@ -981,9 +1176,8 @@ public class DataNode extends Configured
   
   /**
    * get BP registration by blockPool id
-   * @param bpid
    * @return BP registration object
-   * @throws IOException
+   * @throws IOException on error
    */
   @VisibleForTesting
   public DatanodeRegistration getDNRegistrationForBP(String bpid) 
@@ -1071,6 +1265,7 @@ public class DataNode extends Configured
       Token<BlockTokenIdentifier> token) throws IOException {
     checkBlockLocalPathAccess();
     checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+    Preconditions.checkNotNull(data, "Storage not yet initialized");
     BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
     if (LOG.isDebugEnabled()) {
       if (info != null) {
@@ -1206,12 +1401,18 @@ public class DataNode extends Configured
     // in order to avoid any further acceptance of requests, but the peers
     // for block writes are not closed until the clients are notified.
     if (dataXceiverServer != null) {
+      xserver.sendOOBToPeers();
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
     }
 
+    // Interrupt the checkDiskErrorThread and terminate it.
+    if(this.checkDiskErrorThread != null) {
+      this.checkDiskErrorThread.interrupt();
+    }
+    
     // Record the time of initial notification
-    long timeNotified = Time.now();
+    long timeNotified = Time.monotonicNow();
 
     if (localDataXceiverServer != null) {
       ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
@@ -1243,8 +1444,9 @@ public class DataNode extends Configured
       while (true) {
         // When shutting down for restart, wait 2.5 seconds before forcing
         // termination of receiver threads.
-        if (!this.shutdownForUpgrade || 
-            (this.shutdownForUpgrade && (Time.now() - timeNotified > 2500))) {
+        if (!this.shutdownForUpgrade ||
+            (this.shutdownForUpgrade && (Time.monotonicNow() - timeNotified
+                > 2500))) {
           this.threadGroup.interrupt();
         }
         LOG.info("Waiting for threadgroup to exit, active threads is " +
@@ -1319,55 +1521,17 @@ public class DataNode extends Configured
   }
   
   
-  /** Check if there is no space in disk 
-   *  @param e that caused this checkDiskError call
-   **/
-  protected void checkDiskError(Exception e ) throws IOException {
-    
-    LOG.warn("checkDiskError: exception: ", e);
-    if (isNetworkRelatedException(e)) {
-      LOG.info("Not checking disk as checkDiskError was called on a network" +
-      		" related exception");	
-      return;
-    }
-    if (e.getMessage() != null &&
-        e.getMessage().startsWith("No space left on device")) {
-      throw new DiskOutOfSpaceException("No space left on device");
-    } else {
-      checkDiskError();
-    }
-  }
-  
   /**
-   * Check if the provided exception looks like it's from a network error
-   * @param e the exception from a checkDiskError call
-   * @return true if this exception is network related, false otherwise
+   * Check if there is a disk failure asynchronously and if so, handle the error
    */
-  protected boolean isNetworkRelatedException(Exception e) {
-    if (e instanceof SocketException 
-        || e instanceof SocketTimeoutException
-        || e instanceof ClosedChannelException 
-        || e instanceof ClosedByInterruptException) {
-      return true;
-    }
-    
-    String msg = e.getMessage();
-    
-    return null != msg 
-        && (msg.startsWith("An established connection was aborted")
-            || msg.startsWith("Broken pipe")
-            || msg.startsWith("Connection reset")
-            || msg.contains("java.nio.channels.SocketChannel"));
-  }
-  
-  /**
-   *  Check if there is a disk failure and if so, handle the error
-   */
-  public void checkDiskError() {
-    try {
-      data.checkDataDir();
-    } catch (DiskErrorException de) {
-      handleDiskError(de.getMessage());
+  public void checkDiskErrorAsync() {
+    synchronized(checkDiskErrorMutex) {
+      checkDiskErrorFlag = true;
+      if(checkDiskErrorThread == null) {
+        startCheckDiskErrorThread();
+        checkDiskErrorThread.start();
+        LOG.info("Starting CheckDiskError Thread");
+      }
     }
   }
   
@@ -1405,8 +1569,8 @@ public class DataNode extends Configured
     return xmitsInProgress.get();
   }
     
-  private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
-      throws IOException {
+  private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
+      StorageType[] xferTargetStorageTypes) throws IOException {
     BPOfferService bpos = getBPOSForBlock(block);
     DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
     
@@ -1442,16 +1606,17 @@ public class DataNode extends Configured
       LOG.info(bpReg + " Starting thread to transfer " + 
                block + " to " + xfersBuilder);                       
 
-      new Daemon(new DataTransfer(xferTargets, block,
+      new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
           BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
     }
   }
 
   void transferBlocks(String poolId, Block blocks[],
-      DatanodeInfo xferTargets[][]) {
+      DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
     for (int i = 0; i < blocks.length; i++) {
       try {
-        transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
+        transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
+            xferTargetStorageTypes[i]);
       } catch (IOException ie) {
         LOG.warn("Failed to transfer block " + blocks[i], ie);
       }
@@ -1554,6 +1719,7 @@ public class DataNode extends Configured
    */
   private class DataTransfer implements Runnable {
     final DatanodeInfo[] targets;
+    final StorageType[] targetStorageTypes;
     final ExtendedBlock b;
     final BlockConstructionStage stage;
     final private DatanodeRegistration bpReg;
@@ -1564,7 +1730,8 @@ public class DataNode extends Configured
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      */
-    DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
+    DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
+        ExtendedBlock b, BlockConstructionStage stage,
         final String clientname) {
       if (DataTransferProtocol.LOG.isDebugEnabled()) {
         DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
@@ -1574,6 +1741,7 @@ public class DataNode extends Configured
             + ", targests=" + Arrays.asList(targets));
       }
       this.targets = targets;
+      this.targetStorageTypes = targetStorageTypes;
       this.b = b;
       this.stage = stage;
       BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@@ -1605,20 +1773,25 @@ public class DataNode extends Configured
         NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
         sock.setSoTimeout(targets.length * dnConf.socketTimeout);
 
+        //
+        // Header info
+        //
+        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+        if (isBlockTokenEnabled) {
+          accessToken = blockPoolTokenSecretManager.generateToken(b, 
+              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+        }
+
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        if (dnConf.encryptDataTransfer && 
-            !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn,
-                  blockPoolTokenSecretManager.generateDataEncryptionKey(
-                      b.getBlockPoolId()));
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
+        DataEncryptionKeyFactory keyFactory =
+          getDataEncryptionKeyFactoryForBlock(b);
+        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+          unbufIn, keyFactory, accessToken, bpReg);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
         
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1627,16 +1800,8 @@ public class DataNode extends Configured
             false, false, true, DataNode.this, null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
-        //
-        // Header info
-        //
-        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
-        if (isBlockTokenEnabled) {
-          accessToken = blockPoolTokenSecretManager.generateToken(b, 
-              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
-        }
-
-        new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
+        new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
+            clientname, targets, targetStorageTypes, srcNode,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
 
         // send data & checksum
@@ -1667,13 +1832,8 @@ public class DataNode extends Configured
       } catch (IOException ie) {
         LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
             targets[0] + " got ", ie);
-          // check if there are any disk problem
-        try{
-          checkDiskError(ie);
-        } catch(IOException e) {
-            LOG.warn("DataNode.checkDiskError failed in run() with: ", e);
-        }
-        
+        // check if there are any disk problem
+        checkDiskErrorAsync();
       } finally {
         xmitsInProgress.getAndDecrement();
         IOUtils.closeStream(blockSender);
@@ -1683,12 +1843,32 @@ public class DataNode extends Configured
       }
     }
   }
-  
+
+  /**
+   * Returns a new DataEncryptionKeyFactory that generates a key from the
+   * BlockPoolTokenSecretManager, using the block pool ID of the given block.
+   *
+   * @param block for which the factory needs to create a key
+   * @return DataEncryptionKeyFactory for block's block pool ID
+   */
+  DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+      final ExtendedBlock block) {
+    return new DataEncryptionKeyFactory() {
+      @Override
+      public DataEncryptionKey newDataEncryptionKey() {
+        return dnConf.encryptDataTransfer ?
+          blockPoolTokenSecretManager.generateDataEncryptionKey(
+            block.getBlockPoolId()) : null;
+      }
+    };
+  }
+
   /**
    * After a block becomes finalized, a datanode increases metric counter,
    * notifies namenode, and adds it to the block scanner
-   * @param block
-   * @param delHint
+   * @param block block to close
+   * @param delHint hint on which excess block to delete
+   * @param storageUuid UUID of the storage where block is stored
    */
   void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
     metrics.incrBlocksWritten();
@@ -1776,8 +1956,15 @@ public class DataNode extends Configured
       try {
         location = StorageLocation.parse(locationString);
       } catch (IOException ioe) {
-        throw new IllegalArgumentException("Failed to parse conf property "
-            + DFS_DATANODE_DATA_DIR_KEY + ": " + locationString, ioe);
+        LOG.error("Failed to initialize storage directory " + locationString
+            + ". Exception details: " + ioe);
+        // Ignore the exception.
+        continue;
+      } catch (SecurityException se) {
+        LOG.error("Failed to initialize storage directory " + locationString
+                     + ". Exception details: " + se);
+        // Ignore the exception.
+        continue;
       }
 
       locations.add(location);
@@ -2282,11 +2469,11 @@ public class DataNode extends Configured
 
   @Override // ClientDataNodeProtocol
   public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
-    checkWriteAccess(block);
+    checkReadAccess(block);
     return data.getReplicaVisibleLength(block);
   }
 
-  private void checkWriteAccess(final ExtendedBlock block) throws IOException {
+  private void checkReadAccess(final ExtendedBlock block) throws IOException {
     if (isBlockTokenEnabled) {
       Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
           .getTokenIdentifiers();
@@ -2311,11 +2498,12 @@ public class DataNode extends Configured
    *          The corresponding replica must be an RBW or a Finalized.
    *          Its GS and numBytes will be set to
    *          the stored GS and the visible length. 
-   * @param targets
-   * @param client
+   * @param targets targets to transfer the block to
+   * @param client client name
    */
   void transferReplicaForPipelineRecovery(final ExtendedBlock b,
-      final DatanodeInfo[] targets, final String client) throws IOException {
+      final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
+      final String client) throws IOException {
     final long storedGS;
     final long visible;
     final BlockConstructionStage stage;
@@ -2348,7 +2536,7 @@ public class DataNode extends Configured
     b.setNumBytes(visible);
 
     if (targets.length > 0) {
-      new DataTransfer(targets, b, stage, client).run();
+      new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
     }
   }
 
@@ -2423,6 +2611,7 @@ public class DataNode extends Configured
    */
   @Override // DataNodeMXBean
   public String getVolumeInfo() {
+    Preconditions.checkNotNull(data, "Storage not yet initialized");
     return JSON.toString(data.getVolumeInfoMap());
   }
   
@@ -2437,6 +2626,7 @@ public class DataNode extends Configured
 
   @Override // ClientDatanodeProtocol
   public void refreshNamenodes() throws IOException {
+    checkSuperuserPrivilege();
     conf = new Configuration();
     refreshNamenodes(conf);
   }
@@ -2444,6 +2634,7 @@ public class DataNode extends Configured
   @Override // ClientDatanodeProtocol
   public void deleteBlockPool(String blockPoolId, boolean force)
       throws IOException {
+    checkSuperuserPrivilege();
     LOG.info("deleteBlockPool command received for block pool " + blockPoolId
         + ", force=" + force);
     if (blockPoolManager.get(blockPoolId) != null) {
@@ -2459,6 +2650,7 @@ public class DataNode extends Configured
 
   @Override // ClientDatanodeProtocol
   public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException {
+    checkSuperuserPrivilege();
     LOG.info("shutdownDatanode command received (upgrade=" + forUpgrade +
         "). Shutting down Datanode...");
 
@@ -2579,4 +2771,59 @@ public class DataNode extends Configured
   public ShortCircuitRegistry getShortCircuitRegistry() {
     return shortCircuitRegistry;
   }
+
+  /**
+   * Check the disk error
+   */
+  private void checkDiskError() {
+    try {
+      data.checkDataDir();
+    } catch (DiskErrorException de) {
+      handleDiskError(de.getMessage());
+    }
+  }
+
+  /**
+   * Starts a new thread which will check for disk error check request 
+   * every 5 sec
+   */
+  private void startCheckDiskErrorThread() {
+    checkDiskErrorThread = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            while(shouldRun) {
+              boolean tempFlag ;
+              synchronized(checkDiskErrorMutex) {
+                tempFlag = checkDiskErrorFlag;
+                checkDiskErrorFlag = false;
+              }
+              if(tempFlag) {
+                try {
+                  checkDiskError();
+                } catch (Exception e) {
+                  LOG.warn("Unexpected exception occurred while checking disk error  " + e);
+                  checkDiskErrorThread = null;
+                  return;
+                }
+                synchronized(checkDiskErrorMutex) {
+                  lastDiskErrorCheck = Time.monotonicNow();
+                }
+              }
+              try {
+                Thread.sleep(checkDiskErrorInterval);
+              } catch (InterruptedException e) {
+                LOG.debug("InterruptedException in check disk error thread", e);
+                checkDiskErrorThread = null;
+                return;
+              }
+            }
+          }
+    });
+  }
+  
+  public long getLastDiskErrorCheck() {
+    synchronized(checkDiskErrorMutex) {
+      return lastDiskErrorCheck;
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java Tue Aug 19 23:49:39 2014
@@ -62,7 +62,10 @@ public class DataNodeLayoutVersion {  
    * </ul>
    */
   public static enum Feature implements LayoutFeature {
-    FIRST_LAYOUT(-55, -53, "First datenode layout", false);
+    FIRST_LAYOUT(-55, -53, "First datanode layout", false),
+    BLOCKID_BASED_LAYOUT(-56,
+        "The block ID of a finalized block uniquely determines its position " +
+            "in the directory structure");
    
     private final FeatureInfo info;