You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:47 UTC

svn commit: r1619019 [7/11] - in /hadoop/common/branches/YARN-1051/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs-httpfs/src/main/ja...

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Wed Aug 20 01:34:29 2014
@@ -18,13 +18,19 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -35,13 +41,31 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /** 
  * Data storage information file.
@@ -149,43 +173,99 @@ public class DataStorage extends Storage
   }
   
   /**
-   * Analyze storage directories.
-   * Recover from previous transitions if required. 
-   * Perform fs state transition if necessary depending on the namespace info.
-   * Read storage info.
-   * <br>
-   * This method should be synchronized between multiple DN threads.  Only the 
-   * first DN thread does DN level storage dir recoverTransitionRead.
-   * 
+   * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
+   */
+  private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
+    this.layoutVersion = getServiceLayoutVersion();
+    for (StorageDirectory dir : dirs) {
+      writeProperties(dir);
+    }
+  }
+
+  /**
+   * Add a list of volumes to be managed by DataStorage. If the volume is empty,
+   * format it, otherwise recover it from previous transitions if required.
+   *
+   * @param datanode the reference to DataNode.
    * @param nsInfo namespace information
    * @param dataDirs array of data storage directories
    * @param startOpt startup option
    * @throws IOException
    */
-  synchronized void recoverTransitionRead(DataNode datanode,
+  synchronized void addStorageLocations(DataNode datanode,
       NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
       StartupOption startOpt)
       throws IOException {
-    if (initialized) {
-      // DN storage has been initialized, no need to do anything
-      return;
+    // Similar to recoverTransitionRead, it first ensures the datanode level
+    // format is completed.
+    List<StorageLocation> tmpDataDirs =
+        new ArrayList<StorageLocation>(dataDirs);
+    addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
+
+    Collection<File> bpDataDirs = new ArrayList<File>();
+    String bpid = nsInfo.getBlockPoolID();
+    for (StorageLocation dir : dataDirs) {
+      File dnRoot = dir.getFile();
+      File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
+          STORAGE_DIR_CURRENT));
+      bpDataDirs.add(bpRoot);
     }
-    LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
-        + " and name-node layout version: " + nsInfo.getLayoutVersion());
-    
-    // 1. For each data directory calculate its state and 
-    // check whether all is consistent before transitioning.
-    // Format and recover.
-    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
-    ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
+    // mkdir for the list of BlockPoolStorage
+    makeBlockPoolDataDir(bpDataDirs, null);
+    BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
+    if (bpStorage == null) {
+      bpStorage = new BlockPoolSliceStorage(
+          nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+          nsInfo.getClusterID());
+    }
+
+    bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
+    addBlockPoolStorage(bpid, bpStorage);
+  }
+
+  /**
+   * Add a list of volumes to be managed by this DataStorage. If the volume is
+   * empty, it formats the volume, otherwise it recovers it from previous
+   * transitions if required.
+   *
+   * If isInitialize is false, only the directories that have finished the
+   * doTransition() process will be added into DataStorage.
+   *
+   * @param datanode the reference to DataNode.
+   * @param nsInfo namespace information
+   * @param dataDirs array of data storage directories
+   * @param startOpt startup option
+   * @param isInitialize whether it is called when DataNode starts up.
+   * @throws IOException
+   */
+  private synchronized void addStorageLocations(DataNode datanode,
+      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+      StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
+      throws IOException {
+    Set<String> existingStorageDirs = new HashSet<String>();
+    for (int i = 0; i < getNumStorageDirs(); i++) {
+      existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+    }
+
+    // 1. For each data directory calculate its state and check whether all is
+    // consistent before transitioning. Format and recover.
+    ArrayList<StorageState> dataDirStates =
+        new ArrayList<StorageState>(dataDirs.size());
+    List<StorageDirectory> addedStorageDirectories =
+        new ArrayList<StorageDirectory>();
     for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
       File dataDir = it.next().getFile();
+      if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+        LOG.info("Storage directory " + dataDir + " has already been used.");
+        it.remove();
+        continue;
+      }
       StorageDirectory sd = new StorageDirectory(dataDir);
       StorageState curState;
       try {
         curState = sd.analyzeStorage(startOpt, this);
         // sd is locked but not opened
-        switch(curState) {
+        switch (curState) {
         case NORMAL:
           break;
         case NON_EXISTENT:
@@ -194,7 +274,8 @@ public class DataStorage extends Storage
           it.remove();
           continue;
         case NOT_FORMATTED: // format
-          LOG.info("Storage directory " + dataDir + " is not formatted");
+          LOG.info("Storage directory " + dataDir + " is not formatted for "
+            + nsInfo.getBlockPoolID());
           LOG.info("Formatting ...");
           format(sd, nsInfo, datanode.getDatanodeUuid());
           break;
@@ -208,33 +289,82 @@ public class DataStorage extends Storage
         //continue with other good dirs
         continue;
       }
-      // add to the storage list
-      addStorageDir(sd);
+      if (isInitialize) {
+        addStorageDir(sd);
+      }
+      addedStorageDirectories.add(sd);
       dataDirStates.add(curState);
     }
 
-    if (dataDirs.size() == 0 || dataDirStates.size() == 0)  // none of the data dirs exist
+    if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
+      // none of the data dirs exist
+      if (ignoreExistingDirs) {
+        return;
+      }
       throw new IOException(
           "All specified directories are not accessible or do not exist.");
+    }
 
     // 2. Do transitions
     // Each storage directory is treated individually.
-    // During startup some of them can upgrade or rollback 
-    // while others could be uptodate for the regular startup.
-    try {
-      for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-        doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
-        createStorageID(getStorageDir(idx));
+    // During startup some of them can upgrade or rollback
+    // while others could be up-to-date for the regular startup.
+    for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
+        it.hasNext(); ) {
+      StorageDirectory sd = it.next();
+      try {
+        doTransition(datanode, sd, nsInfo, startOpt);
+        createStorageID(sd);
+      } catch (IOException e) {
+        if (!isInitialize) {
+          sd.unlock();
+          it.remove();
+          continue;
+        }
+        unlockAll();
+        throw e;
       }
-    } catch (IOException e) {
-      unlockAll();
-      throw e;
     }
 
-    // 3. Update all storages. Some of them might have just been formatted.
-    this.writeAll();
+    // 3. Update all successfully loaded storages. Some of them might have just
+    // been formatted.
+    this.writeAll(addedStorageDirectories);
+
+    // 4. Make newly loaded storage directories visible for service.
+    if (!isInitialize) {
+      this.storageDirs.addAll(addedStorageDirectories);
+    }
+  }
+
+  /**
+   * Analyze storage directories.
+   * Recover from previous transitions if required.
+   * Perform fs state transition if necessary depending on the namespace info.
+   * Read storage info.
+   * <br>
+   * This method should be synchronized between multiple DN threads.  Only the
+   * first DN thread does DN level storage dir recoverTransitionRead.
+   *
+   * @param nsInfo namespace information
+   * @param dataDirs array of data storage directories
+   * @param startOpt startup option
+   * @throws IOException
+   */
+  synchronized void recoverTransitionRead(DataNode datanode,
+      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+      StartupOption startOpt)
+      throws IOException {
+    if (initialized) {
+      // DN storage has been initialized, no need to do anything
+      return;
+    }
+    LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+        + " and NameNode layout version: " + nsInfo.getLayoutVersion());
+
+    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
+    addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
     
-    // 4. mark DN storage is initialized
+    // mark DN storage is initialized
     this.initialized = true;
   }
 
@@ -261,6 +391,7 @@ public class DataStorage extends Storage
           STORAGE_DIR_CURRENT));
       bpDataDirs.add(bpRoot);
     }
+
     // mkdir for the list of BlockPoolStorage
     makeBlockPoolDataDir(bpDataDirs, null);
     BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
@@ -488,7 +619,7 @@ public class DataStorage extends Storage
     
     // do upgrade
     if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
-      doUpgrade(sd, nsInfo);  // upgrade
+      doUpgrade(datanode, sd, nsInfo);  // upgrade
       return;
     }
     
@@ -523,7 +654,8 @@ public class DataStorage extends Storage
    * @param sd  storage directory
    * @throws IOException on error
    */
-  void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+  void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo)
+      throws IOException {
     // If the existing on-disk layout version supportes federation, simply
     // update its layout version.
     if (DataNodeLayoutVersion.supports(
@@ -568,7 +700,8 @@ public class DataStorage extends Storage
     BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), 
         nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
     bpStorage.format(curDir, nsInfo);
-    linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
+    linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
+        STORAGE_DIR_CURRENT));
     
     // 4. Write version file under <SD>/current
     layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
@@ -746,22 +879,22 @@ public class DataStorage extends Storage
    *
    * @throws IOException If error occurs during hardlink
    */
-  private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
-      throws IOException {
+  private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
+      File toDir) throws IOException {
     HardLink hardLink = new HardLink();
     // do the link
     int diskLayoutVersion = this.getLayoutVersion();
     if (DataNodeLayoutVersion.supports(
         LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
       // hardlink finalized blocks in tmpDir/finalized
-      linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), 
+      linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
           new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
       // hardlink rbw blocks in tmpDir/rbw
-      linkBlocks(new File(fromDir, STORAGE_DIR_RBW), 
+      linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
           new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
     } else { // pre-RBW version
       // hardlink finalized blocks in tmpDir
-      linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED), 
+      linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
           diskLayoutVersion, hardLink);      
       if (fromBbwDir.exists()) {
         /*
@@ -770,15 +903,67 @@ public class DataStorage extends Storage
          * NOT underneath the 'current' directory in those releases.  See
          * HDFS-3731 for details.
          */
-        linkBlocks(fromBbwDir,
+        linkBlocks(datanode, fromBbwDir,
             new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
       }
     } 
     LOG.info( hardLink.linkStats.report() );
   }
+
+  private static class LinkArgs {
+    public File src;
+    public File dst;
+
+    public LinkArgs(File src, File dst) {
+      this.src = src;
+      this.dst = dst;
+    }
+  }
+
+  static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
+      HardLink hl) throws IOException {
+    boolean upgradeToIdBasedLayout = false;
+    // If we are upgrading from a version older than the one where we introduced
+    // block ID-based layout AND we're working with the finalized directory,
+    // we'll need to upgrade from the old flat layout to the block ID-based one
+    if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
+        getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
+      upgradeToIdBasedLayout = true;
+    }
+
+    final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
+    linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
+        idBasedLayoutSingleLinks);
+    int numLinkWorkers = datanode.getConf().getInt(
+        DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
+        DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
+    ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
+    final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
+    List<Future<Void>> futures = Lists.newArrayList();
+    for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
+      final int iCopy = i;
+      futures.add(linkWorkers.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          int upperBound = Math.min(iCopy + step,
+              idBasedLayoutSingleLinks.size());
+          for (int j = iCopy; j < upperBound; j++) {
+            LinkArgs cur = idBasedLayoutSingleLinks.get(j);
+            NativeIO.link(cur.src, cur.dst);
+          }
+          return null;
+        }
+      }));
+    }
+    linkWorkers.shutdown();
+    for (Future<Void> f : futures) {
+      Futures.get(f, IOException.class);
+    }
+  }
   
-  static void linkBlocks(File from, File to, int oldLV, HardLink hl) 
-  throws IOException {
+  static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl,
+  boolean upgradeToIdBasedLayout, File blockRoot,
+      List<LinkArgs> idBasedLayoutSingleLinks) throws IOException {
     if (!from.exists()) {
       return;
     }
@@ -805,9 +990,6 @@ public class DataStorage extends Storage
     // from is a directory
     hl.linkStats.countDirs++;
     
-    if (!to.mkdirs())
-      throw new IOException("Cannot create directory " + to);
-    
     String[] blockNames = from.list(new java.io.FilenameFilter() {
       @Override
       public boolean accept(File dir, String name) {
@@ -815,12 +997,36 @@ public class DataStorage extends Storage
       }
     });
 
+    // If we are upgrading to block ID-based layout, we don't want to recreate
+    // any subdirs from the source that contain blocks, since we have a new
+    // directory structure
+    if (!upgradeToIdBasedLayout || !to.getName().startsWith(
+        BLOCK_SUBDIR_PREFIX)) {
+      if (!to.mkdirs())
+        throw new IOException("Cannot create directory " + to);
+    }
+
     // Block files just need hard links with the same file names
     // but a different directory
     if (blockNames.length > 0) {
-      HardLink.createHardLinkMult(from, blockNames, to);
-      hl.linkStats.countMultLinks++;
-      hl.linkStats.countFilesMultLinks += blockNames.length;
+      if (upgradeToIdBasedLayout) {
+        for (String blockName : blockNames) {
+          long blockId = Block.getBlockId(blockName);
+          File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId);
+          if (!blockLocation.exists()) {
+            if (!blockLocation.mkdirs()) {
+              throw new IOException("Failed to mkdirs " + blockLocation);
+            }
+          }
+          idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName),
+              new File(blockLocation, blockName)));
+          hl.linkStats.countSingleLinks++;
+        }
+      } else {
+        HardLink.createHardLinkMult(from, blockNames, to);
+        hl.linkStats.countMultLinks++;
+        hl.linkStats.countFilesMultLinks += blockNames.length;
+      }
     } else {
       hl.linkStats.countEmptyDirs++;
     }
@@ -834,8 +1040,9 @@ public class DataStorage extends Storage
         }
       });
     for(int i = 0; i < otherNames.length; i++)
-      linkBlocks(new File(from, otherNames[i]), 
-          new File(to, otherNames[i]), oldLV, hl);
+      linkBlocksHelper(new File(from, otherNames[i]),
+          new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout,
+          blockRoot, idBasedLayoutSingleLinks);
   }
 
   /**

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 20 01:34:29 2014
@@ -103,7 +103,8 @@ class DataXceiver extends Receiver imple
   private long opStartTime; //the start time of receiving an Op
   private final InputStream socketIn;
   private OutputStream socketOut;
-
+  private BlockReceiver blockReceiver = null;
+  
   /**
    * Client Name used in previous operation. Not available on first request
    * on the socket.
@@ -159,6 +160,12 @@ class DataXceiver extends Receiver imple
     return socketOut;
   }
 
+  public void sendOOB() throws IOException, InterruptedException {
+    LOG.info("Sending OOB to peer: " + peer);
+    if(blockReceiver!=null)
+      blockReceiver.sendOOB();
+  }
+  
   /**
    * Read/write data from/to the DataXceiverServer.
    */
@@ -168,7 +175,7 @@ class DataXceiver extends Receiver imple
     Op op = null;
 
     try {
-      dataXceiverServer.addPeer(peer, Thread.currentThread());
+      dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
       IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
@@ -584,7 +591,6 @@ class DataXceiver extends Receiver imple
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
     Socket mirrorSock = null;           // socket to next target
-    BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
     Status mirrorInStatus = SUCCESS;
@@ -708,7 +714,7 @@ class DataXceiver extends Receiver imple
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets);
+            mirrorAddr, null, targets, false);
 
         // send close-ack for transfer-RBW/Finalized 
         if (isTransfer) {
@@ -747,6 +753,7 @@ class DataXceiver extends Receiver imple
       IOUtils.closeStream(replyOut);
       IOUtils.closeSocket(mirrorSock);
       IOUtils.closeStream(blockReceiver);
+      blockReceiver = null;
     }
 
     //update metrics
@@ -983,7 +990,7 @@ class DataXceiver extends Receiver imple
     String errMsg = null;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
-    
+    DataOutputStream replyOut = new DataOutputStream(getOutputStream());
     try {
       // get the output stream to the proxy
       final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@@ -1040,8 +1047,8 @@ class DataXceiver extends Receiver imple
           CachingStrategy.newDropBehind());
 
       // receive a block
-      blockReceiver.receiveBlock(null, null, null, null, 
-          dataXceiverServer.balanceThrottler, null);
+      blockReceiver.receiveBlock(null, null, replyOut, null, 
+          dataXceiverServer.balanceThrottler, null, true);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(
@@ -1076,6 +1083,7 @@ class DataXceiver extends Receiver imple
       IOUtils.closeStream(proxyOut);
       IOUtils.closeStream(blockReceiver);
       IOUtils.closeStream(proxyReply);
+      IOUtils.closeStream(replyOut);
     }
 
     //update metrics

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Aug 20 01:34:29 2014
@@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.PeerServer;
-import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Server used for receiving/sending a block of data.
@@ -45,6 +45,7 @@ class DataXceiverServer implements Runna
   private final PeerServer peerServer;
   private final DataNode datanode;
   private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
+  private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
   private boolean closed = false;
   
   /**
@@ -217,18 +218,38 @@ class DataXceiverServer implements Runna
     }
   }
   
-  synchronized void addPeer(Peer peer, Thread t) throws IOException {
+  synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
+      throws IOException {
     if (closed) {
       throw new IOException("Server closed.");
     }
     peers.put(peer, t);
+    peersXceiver.put(peer, xceiver);
   }
 
   synchronized void closePeer(Peer peer) {
     peers.remove(peer);
+    peersXceiver.remove(peer);
     IOUtils.cleanup(null, peer);
   }
 
+  // Sending OOB to all peers
+  public synchronized void sendOOBToPeers() {
+    if (!datanode.shutdownForUpgrade) {
+      return;
+    }
+
+    for (Peer p : peers.keySet()) {
+      try {
+        peersXceiver.get(p).sendOOB();
+      } catch (IOException e) {
+        LOG.warn("Got error when sending OOB message.", e);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted when sending OOB message.");
+      }
+    }
+  }
+  
   // Notify all peers of the shutdown and restart.
   // datanode.shouldRun should still be true and datanode.restarting should
   // be set true before calling this method.
@@ -247,6 +268,7 @@ class DataXceiverServer implements Runna
       IOUtils.cleanup(LOG, p);
     }
     peers.clear();
+    peersXceiver.clear();
   }
 
   // Return the number of peers.
@@ -254,7 +276,14 @@ class DataXceiverServer implements Runna
     return peers.size();
   }
 
+  // Return the number of peers and DataXceivers.
+  @VisibleForTesting
+  synchronized int getNumPeersXceiver() {
+    return peersXceiver.size();
+  }
+  
   synchronized void releasePeer(Peer peer) {
     peers.remove(peer);
+    peersXceiver.remove(peer);
   }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java Wed Aug 20 01:34:29 2014
@@ -30,6 +30,8 @@ public class DatanodeUtil {
 
   public static final String DISK_ERROR = "Possible disk error: ";
 
+  private static final String SEP = System.getProperty("file.separator");
+
   /** Get the cause of an I/O exception if caused by a possible disk error
    * @param ioe an I/O exception
    * @return cause if the I/O exception is caused by a possible disk error;
@@ -78,4 +80,38 @@ public class DatanodeUtil {
   public static File getUnlinkTmpFile(File f) {
     return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
   }
+
+  /**
+   * Checks whether there are any files anywhere in the directory tree rooted
+   * at dir (directories don't count as files). dir must exist
+   * @return true if there are no files
+   * @throws IOException if unable to list subdirectories
+   */
+  public static boolean dirNoFilesRecursive(File dir) throws IOException {
+    File[] contents = dir.listFiles();
+    if (contents == null) {
+      throw new IOException("Cannot list contents of " + dir);
+    }
+    for (File f : contents) {
+      if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get the directory where a finalized block with this ID should be stored.
+   * Do not attempt to create the directory.
+   * @param root the root directory where finalized blocks are stored
+   * @param blockId
+   * @return
+   */
+  public static File idToBlockDir(File root, long blockId) {
+    int d1 = (int)((blockId >> 16) & 0xff);
+    int d2 = (int)((blockId >> 8) & 0xff);
+    String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
+        DataStorage.BLOCK_SUBDIR_PREFIX + d2;
+    return new File(root, path);
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Wed Aug 20 01:34:29 2014
@@ -54,10 +54,10 @@ abstract public class ReplicaInfo extend
   private File baseDir;
   
   /**
-   * Ints representing the sub directory path from base dir to the directory
-   * containing this replica.
+   * Whether or not this replica's parent directory includes subdirs, in which
+   * case we can generate them based on the replica's block ID
    */
-  private int[] subDirs;
+  private boolean hasSubdirs;
   
   private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
 
@@ -151,18 +151,8 @@ abstract public class ReplicaInfo extend
    * @return the parent directory path where this replica is located
    */
   File getDir() {
-    if (subDirs == null) {
-      return null;
-    }
-
-    StringBuilder sb = new StringBuilder();
-    for (int i : subDirs) {
-      sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
-      sb.append(i);
-      sb.append("/");
-    }
-    File ret = new File(baseDir, sb.toString());
-    return ret;
+    return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
+        getBlockId()) : baseDir;
   }
 
   /**
@@ -175,54 +165,46 @@ abstract public class ReplicaInfo extend
 
   private void setDirInternal(File dir) {
     if (dir == null) {
-      subDirs = null;
       baseDir = null;
       return;
     }
 
-    ReplicaDirInfo replicaDirInfo = parseSubDirs(dir);
-    this.subDirs = replicaDirInfo.subDirs;
+    ReplicaDirInfo dirInfo = parseBaseDir(dir);
+    this.hasSubdirs = dirInfo.hasSubidrs;
     
     synchronized (internedBaseDirs) {
-      if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) {
+      if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
         // Create a new String path of this file and make a brand new File object
         // to guarantee we drop the reference to the underlying char[] storage.
-        File baseDir = new File(replicaDirInfo.baseDirPath);
-        internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir);
+        File baseDir = new File(dirInfo.baseDirPath);
+        internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
       }
-      this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath);
+      this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
     }
   }
-  
+
   @VisibleForTesting
   public static class ReplicaDirInfo {
-    @VisibleForTesting
     public String baseDirPath;
-    
-    @VisibleForTesting
-    public int[] subDirs;
+    public boolean hasSubidrs;
+
+    public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
+      this.baseDirPath = baseDirPath;
+      this.hasSubidrs = hasSubidrs;
+    }
   }
   
   @VisibleForTesting
-  public static ReplicaDirInfo parseSubDirs(File dir) {
-    ReplicaDirInfo ret = new ReplicaDirInfo();
+  public static ReplicaDirInfo parseBaseDir(File dir) {
     
     File currentDir = dir;
-    List<Integer> subDirList = new ArrayList<Integer>();
+    boolean hasSubdirs = false;
     while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
-      // Prepend the integer into the list.
-      subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
-          DataStorage.BLOCK_SUBDIR_PREFIX, "")));
+      hasSubdirs = true;
       currentDir = currentDir.getParentFile();
     }
-    ret.subDirs = new int[subDirList.size()];
-    for (int i = 0; i < subDirList.size(); i++) {
-      ret.subDirs[i] = subDirList.get(i);
-    }
-    
-    ret.baseDirPath = currentDir.getAbsolutePath();
     
-    return ret;
+    return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
   }
 
   /**

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java Wed Aug 20 01:34:29 2014
@@ -74,7 +74,7 @@ import com.google.common.collect.HashMul
  * DN also marks the block's slots as "unanchorable" to prevent additional 
  * clients from initiating these operations in the future.
  * 
- * The counterpart fo this class on the client is {@link DfsClientShmManager}.
+ * The counterpart of this class on the client is {@link DfsClientShmManager}.
  */
 public class ShortCircuitRegistry {
   public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class);
@@ -217,7 +217,32 @@ public class ShortCircuitRegistry {
     }
     return allowMunlock;
   }
-  
+
+  /**
+   * Invalidate any slot associated with a blockId that we are invalidating
+   * (deleting) from this DataNode.  When a slot is invalid, the DFSClient will
+   * not use the corresponding replica for new read or mmap operations (although
+   * existing, ongoing read or mmap operations will complete.)
+   *
+   * @param blockId        The block ID.
+   */
+  public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
+    if (!enabled) return;
+    final Set<Slot> affectedSlots = slots.get(blockId);
+    if (!affectedSlots.isEmpty()) {
+      final StringBuilder bld = new StringBuilder();
+      String prefix = "";
+      bld.append("Block ").append(blockId).append(" has been invalidated.  ").
+          append("Marking short-circuit slots as invalid: ");
+      for (Slot slot : affectedSlots) {
+        slot.makeInvalid();
+        bld.append(prefix).append(slot.toString());
+        prefix = ", ";
+      }
+      LOG.info(bld.toString());
+    }
+  }
+
   public static class NewShmInfo implements Closeable {
     public final ShmId shmId;
     public final FileInputStream stream;

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Wed Aug 20 01:34:29 2014
@@ -78,7 +78,7 @@ public class StorageLocation {
    * @return A StorageLocation object if successfully parsed, null otherwise.
    *         Does not throw any exceptions.
    */
-  static StorageLocation parse(String rawLocation)
+  public static StorageLocation parse(String rawLocation)
       throws IOException, SecurityException {
     Matcher matcher = regex.matcher(rawLocation);
     StorageType storageType = StorageType.DEFAULT;

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Wed Aug 20 01:34:29 2014
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -91,6 +93,10 @@ public interface FsDatasetSpi<V extends 
   /** @return a list of volumes. */
   public List<V> getVolumes();
 
+  /** Add an array of StorageLocation to FsDataset. */
+  public void addVolumes(Collection<StorageLocation> volumes)
+      throws IOException;
+
   /** @return a storage with the given storage ID */
   public DatanodeStorage getStorage(final String storageUuid);
 

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Wed Aug 20 01:34:29 2014
@@ -59,7 +59,8 @@ class BlockPoolSlice {
   private final String bpid;
   private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
   private final File currentDir; // StorageDirectory/current/bpid/current
-  private final LDir finalizedDir; // directory store Finalized replica
+  // directory where finalized replicas are stored
+  private final File finalizedDir;
   private final File rbwDir; // directory store RBW replica
   private final File tmpDir; // directory store Temporary replica
   private static final String DU_CACHE_FILE = "dfsUsed";
@@ -82,8 +83,13 @@ class BlockPoolSlice {
     this.bpid = bpid;
     this.volume = volume;
     this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
-    final File finalizedDir = new File(
+    this.finalizedDir = new File(
         currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+    if (!this.finalizedDir.exists()) {
+      if (!this.finalizedDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs " + this.finalizedDir);
+      }
+    }
 
     // Files that were being written when the datanode was last shutdown
     // are now moved back to the data directory. It is possible that
@@ -95,10 +101,6 @@ class BlockPoolSlice {
       FileUtil.fullyDelete(tmpDir);
     }
     this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
-    final int maxBlocksPerDir = conf.getInt(
-        DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
-        DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
-    this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir);
     if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
       if (!rbwDir.isDirectory()) {
         throw new IOException("Mkdirs failed to create " + rbwDir.toString());
@@ -131,7 +133,7 @@ class BlockPoolSlice {
   }
 
   File getFinalizedDir() {
-    return finalizedDir.dir;
+    return finalizedDir;
   }
   
   File getRbwDir() {
@@ -239,26 +241,57 @@ class BlockPoolSlice {
   }
 
   File addBlock(Block b, File f) throws IOException {
-    File blockFile = finalizedDir.addBlock(b, f);
+    File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+    if (!blockDir.exists()) {
+      if (!blockDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs " + blockDir);
+      }
+    }
+    File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
     File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
     dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
     return blockFile;
   }
     
   void checkDirs() throws DiskErrorException {
-    finalizedDir.checkDirTree();
+    DiskChecker.checkDirs(finalizedDir);
     DiskChecker.checkDir(tmpDir);
     DiskChecker.checkDir(rbwDir);
   }
     
   void getVolumeMap(ReplicaMap volumeMap) throws IOException {
     // add finalized replicas
-    finalizedDir.getVolumeMap(bpid, volumeMap, volume);
+    addToReplicasMap(volumeMap, finalizedDir, true);
     // add rbw replicas
     addToReplicasMap(volumeMap, rbwDir, false);
   }
 
   /**
+   * Recover an unlinked tmp file on datanode restart. If the original block
+   * does not exist, then the tmp file is renamed to be the
+   * original file name and the original name is returned; otherwise the tmp
+   * file is deleted and null is returned.
+   */
+  File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
+    File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
+    if (blockFile.exists()) {
+      // If the original block file still exists, then no recovery is needed.
+      if (!unlinkedTmp.delete()) {
+        throw new IOException("Unable to cleanup unlinked tmp file " +
+            unlinkedTmp);
+      }
+      return null;
+    } else {
+      if (!unlinkedTmp.renameTo(blockFile)) {
+        throw new IOException("Unable to rename unlinked tmp file " +
+            unlinkedTmp);
+      }
+      return blockFile;
+    }
+  }
+
+
+  /**
    * Add replicas under the given directory to the volume map
    * @param volumeMap the replicas map
    * @param dir an input directory
@@ -267,23 +300,34 @@ class BlockPoolSlice {
    */
   void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
       ) throws IOException {
-    File blockFiles[] = FileUtil.listFiles(dir);
-    for (File blockFile : blockFiles) {
-      if (!Block.isBlockFilename(blockFile))
+    File files[] = FileUtil.listFiles(dir);
+    for (File file : files) {
+      if (file.isDirectory()) {
+        addToReplicasMap(volumeMap, file, isFinalized);
+      }
+
+      if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
+        file = recoverTempUnlinkedBlock(file);
+        if (file == null) { // the original block still exists, so we cover it
+          // in another iteration and can continue here
+          continue;
+        }
+      }
+      if (!Block.isBlockFilename(file))
         continue;
       
       long genStamp = FsDatasetUtil.getGenerationStampFromFile(
-          blockFiles, blockFile);
-      long blockId = Block.filename2id(blockFile.getName());
+          files, file);
+      long blockId = Block.filename2id(file.getName());
       ReplicaInfo newReplica = null;
       if (isFinalized) {
         newReplica = new FinalizedReplica(blockId, 
-            blockFile.length(), genStamp, volume, blockFile.getParentFile());
+            file.length(), genStamp, volume, file.getParentFile());
       } else {
 
         boolean loadRwr = true;
-        File restartMeta = new File(blockFile.getParent()  +
-            File.pathSeparator + "." + blockFile.getName() + ".restart");
+        File restartMeta = new File(file.getParent()  +
+            File.pathSeparator + "." + file.getName() + ".restart");
         Scanner sc = null;
         try {
           sc = new Scanner(restartMeta);
@@ -291,8 +335,8 @@ class BlockPoolSlice {
           if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
             // It didn't expire. Load the replica as a RBW.
             newReplica = new ReplicaBeingWritten(blockId,
-                validateIntegrityAndSetLength(blockFile, genStamp), 
-                genStamp, volume, blockFile.getParentFile(), null);
+                validateIntegrityAndSetLength(file, genStamp),
+                genStamp, volume, file.getParentFile(), null);
             loadRwr = false;
           }
           sc.close();
@@ -301,7 +345,7 @@ class BlockPoolSlice {
               restartMeta.getPath());
           }
         } catch (FileNotFoundException fnfe) {
-          // nothing to do here
+          // nothing to do hereFile dir =
         } finally {
           if (sc != null) {
             sc.close();
@@ -310,15 +354,15 @@ class BlockPoolSlice {
         // Restart meta doesn't exist or expired.
         if (loadRwr) {
           newReplica = new ReplicaWaitingToBeRecovered(blockId,
-              validateIntegrityAndSetLength(blockFile, genStamp), 
-              genStamp, volume, blockFile.getParentFile());
+              validateIntegrityAndSetLength(file, genStamp),
+              genStamp, volume, file.getParentFile());
         }
       }
 
       ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
       if (oldReplica != null) {
         FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
-            "on disk: " + oldReplica.getBlockFile() + " and " + blockFile );
+            "on disk: " + oldReplica.getBlockFile() + " and " + file );
       }
     }
   }
@@ -405,10 +449,6 @@ class BlockPoolSlice {
     }
   }
     
-  void clearPath(File f) {
-    finalizedDir.clearPath(f);
-  }
-    
   @Override
   public String toString() {
     return currentDir.getAbsolutePath();

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Wed Aug 20 01:34:29 2014
@@ -61,6 +61,7 @@ class FsDatasetAsyncDiskService {
   private static final long THREADS_KEEP_ALIVE_SECONDS = 60; 
   
   private final DataNode datanode;
+  private final ThreadGroup threadGroup;
   private Map<File, ThreadPoolExecutor> executors
       = new HashMap<File, ThreadPoolExecutor>();
   
@@ -70,42 +71,52 @@ class FsDatasetAsyncDiskService {
    * 
    * The AsyncDiskServices uses one ThreadPool per volume to do the async
    * disk operations.
-   * 
-   * @param volumes The roots of the data volumes.
    */
-  FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) {
+  FsDatasetAsyncDiskService(DataNode datanode) {
     this.datanode = datanode;
+    this.threadGroup = new ThreadGroup(getClass().getSimpleName());
+  }
+
+  private void addExecutorForVolume(final File volume) {
+    ThreadFactory threadFactory = new ThreadFactory() {
+      int counter = 0;
+
+      @Override
+      public Thread newThread(Runnable r) {
+        int thisIndex;
+        synchronized (this) {
+          thisIndex = counter++;
+        }
+        Thread t = new Thread(threadGroup, r);
+        t.setName("Async disk worker #" + thisIndex +
+            " for volume " + volume);
+        return t;
+      }
+    };
 
-    final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName());
-    // Create one ThreadPool per volume
-    for (int v = 0 ; v < volumes.length; v++) {
-      final File vol = volumes[v];
-      ThreadFactory threadFactory = new ThreadFactory() {
-          int counter = 0;
-
-          @Override
-          public Thread newThread(Runnable r) {
-            int thisIndex;
-            synchronized (this) {
-              thisIndex = counter++;
-            }
-            Thread t = new Thread(threadGroup, r);
-            t.setName("Async disk worker #" + thisIndex +
-                      " for volume " + vol);
-            return t;
-          }
-        };
-
-      ThreadPoolExecutor executor = new ThreadPoolExecutor(
-          CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, 
-          THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, 
-          new LinkedBlockingQueue<Runnable>(), threadFactory);
-
-      // This can reduce the number of running threads
-      executor.allowCoreThreadTimeOut(true);
-      executors.put(vol, executor);
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(
+        CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
+        THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+    // This can reduce the number of running threads
+    executor.allowCoreThreadTimeOut(true);
+    executors.put(volume, executor);
+  }
+
+  /**
+   * Starts AsyncDiskService for a new volume
+   * @param volume the root of the new data volume.
+   */
+  synchronized void addVolume(File volume) {
+    if (executors == null) {
+      throw new RuntimeException("AsyncDiskService is already shutdown");
     }
-    
+    ThreadPoolExecutor executor = executors.get(volume);
+    if (executor != null) {
+      throw new RuntimeException("Volume " + volume + " is already existed.");
+    }
+    addExecutorForVolume(volume);
   }
   
   synchronized long countPendingDeletions() {

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Wed Aug 20 01:34:29 2014
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -201,6 +202,7 @@ class FsDatasetImpl implements FsDataset
   final Map<String, DatanodeStorage> storageMap;
   final FsDatasetAsyncDiskService asyncDiskService;
   final FsDatasetCache cacheManager;
+  private final Configuration conf;
   private final int validVolsRequired;
 
   final ReplicaMap volumeMap;
@@ -215,6 +217,7 @@ class FsDatasetImpl implements FsDataset
       ) throws IOException {
     this.datanode = datanode;
     this.dataStorage = storage;
+    this.conf = conf;
     // The number of volumes required for operation is the total number 
     // of volumes minus the number of failed volumes we can tolerate.
     final int volFailuresTolerated =
@@ -241,38 +244,76 @@ class FsDatasetImpl implements FsDataset
     }
 
     storageMap = new HashMap<String, DatanodeStorage>();
-    final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
-        storage.getNumStorageDirs());
-    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
-      Storage.StorageDirectory sd = storage.getStorageDir(idx);
-      final File dir = sd.getCurrentDir();
-      final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot());
-      volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
-          storageType));
-      LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
-      storageMap.put(sd.getStorageUuid(),
-          new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType));
-    }
     volumeMap = new ReplicaMap(this);
-
     @SuppressWarnings("unchecked")
     final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
         ReflectionUtils.newInstance(conf.getClass(
             DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
             RoundRobinVolumeChoosingPolicy.class,
             VolumeChoosingPolicy.class), conf);
-    volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
-    volumes.initializeReplicaMaps(volumeMap);
+    volumes = new FsVolumeList(volsFailed, blockChooserImpl);
+    asyncDiskService = new FsDatasetAsyncDiskService(datanode);
 
-    File[] roots = new File[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
-      roots[idx] = storage.getStorageDir(idx).getCurrentDir();
+      addVolume(dataLocations, storage.getStorageDir(idx));
     }
-    asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
+
     cacheManager = new FsDatasetCache(this);
     registerMBean(datanode.getDatanodeUuid());
   }
 
+  private void addVolume(Collection<StorageLocation> dataLocations,
+      Storage.StorageDirectory sd) throws IOException {
+    final File dir = sd.getCurrentDir();
+    final StorageType storageType =
+        getStorageTypeFromLocations(dataLocations, sd.getRoot());
+
+    // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
+    // nothing needed to be rolled back to make various data structures, e.g.,
+    // storageMap and asyncDiskService, consistent.
+    FsVolumeImpl fsVolume = new FsVolumeImpl(
+        this, sd.getStorageUuid(), dir, this.conf, storageType);
+    fsVolume.getVolumeMap(volumeMap);
+
+    volumes.addVolume(fsVolume);
+    storageMap.put(sd.getStorageUuid(),
+        new DatanodeStorage(sd.getStorageUuid(),
+                            DatanodeStorage.State.NORMAL,
+                            storageType));
+    asyncDiskService.addVolume(sd.getCurrentDir());
+
+    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+  }
+
+  /**
+   * Add an array of StorageLocation to FsDataset.
+   *
+   * @pre dataStorage must have these volumes.
+   * @param volumes
+   * @throws IOException
+   */
+  @Override
+  public synchronized void addVolumes(Collection<StorageLocation> volumes)
+      throws IOException {
+    final Collection<StorageLocation> dataLocations =
+        DataNode.getStorageLocations(this.conf);
+    Map<String, Storage.StorageDirectory> allStorageDirs =
+        new HashMap<String, Storage.StorageDirectory>();
+    for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
+      Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
+      allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd);
+    }
+
+    for (StorageLocation vol : volumes) {
+      String key = vol.getFile().getAbsolutePath();
+      if (!allStorageDirs.containsKey(key)) {
+        LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
+      } else {
+        addVolume(dataLocations, allStorageDirs.get(key));
+      }
+    }
+  }
+
   private StorageType getStorageTypeFromLocations(
       Collection<StorageLocation> dataLocations, File dir) {
     for (StorageLocation dataLocation : dataLocations) {
@@ -1150,7 +1191,7 @@ class FsDatasetImpl implements FsDataset
         return f;
    
       // if file is not null, but doesn't exist - possibly disk failed
-      datanode.checkDiskError();
+      datanode.checkDiskErrorAsync();
     }
     
     if (LOG.isDebugEnabled()) {
@@ -1223,17 +1264,17 @@ class FsDatasetImpl implements FsDataset
               +  ". Parent not found for file " + f);
           continue;
         }
-        ReplicaState replicaState = info.getState();
-        if (replicaState == ReplicaState.FINALIZED || 
-            (replicaState == ReplicaState.RUR && 
-                ((ReplicaUnderRecovery)info).getOriginalReplica().getState() == 
-                  ReplicaState.FINALIZED)) {
-          v.clearPath(bpid, parent);
-        }
         volumeMap.remove(bpid, invalidBlks[i]);
       }
+
+      // If a DFSClient has the replica in its cache of short-circuit file
+      // descriptors (and the client is using ShortCircuitShm), invalidate it.
+      datanode.getShortCircuitRegistry().processBlockInvalidation(
+                new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid));
+
       // If the block is cached, start uncaching it.
       cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
+
       // Delete the block asynchronously to make sure we can do it fast enough.
       // It's ok to unlink the block file before the uncache operation
       // finishes.

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Wed Aug 20 01:34:29 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -235,10 +236,6 @@ class FsVolumeImpl implements FsVolumeSp
     // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
     bp.addToReplicasMap(volumeMap, dir, isFinalized);
   }
-  
-  void clearPath(String bpid, File f) throws IOException {
-    getBlockPoolSlice(bpid).clearPath(f);
-  }
 
   @Override
   public String toString() {
@@ -274,7 +271,8 @@ class FsVolumeImpl implements FsVolumeSp
     File finalizedDir = new File(bpCurrentDir,
         DataStorage.STORAGE_DIR_FINALIZED);
     File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
-    if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
+    if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
+        finalizedDir)) {
       return false;
     }
     if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
@@ -301,7 +299,8 @@ class FsVolumeImpl implements FsVolumeSp
       if (!rbwDir.delete()) {
         throw new IOException("Failed to delete " + rbwDir);
       }
-      if (!finalizedDir.delete()) {
+      if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
+          !FileUtil.fullyDelete(finalizedDir)) {
         throw new IOException("Failed to delete " + finalizedDir);
       }
       FileUtil.fullyDelete(tmpDir);

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Wed Aug 20 01:34:29 2014
@@ -40,9 +40,8 @@ class FsVolumeList {
   private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
   private volatile int numFailedVolumes;
 
-  FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
+  FsVolumeList(int failedVols,
       VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
-    this.volumes = Collections.unmodifiableList(volumes);
     this.blockChooser = blockChooser;
     this.numFailedVolumes = failedVols;
   }
@@ -101,12 +100,6 @@ class FsVolumeList {
     }
     return remaining;
   }
-    
-  void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException {
-    for (FsVolumeImpl v : volumes) {
-      v.getVolumeMap(globalReplicaMap);
-    }
-  }
   
   void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
     long totalStartTime = Time.monotonicNow();
@@ -205,6 +198,19 @@ class FsVolumeList {
     return volumes.toString();
   }
 
+  /**
+   * Dynamically add new volumes to the existing volumes that this DN manages.
+   * @param newVolume the instance of new FsVolumeImpl.
+   */
+  synchronized void addVolume(FsVolumeImpl newVolume) {
+    // Make a copy of volumes to add new volumes.
+    final List<FsVolumeImpl> volumeList = volumes == null ?
+        new ArrayList<FsVolumeImpl>() :
+        new ArrayList<FsVolumeImpl>(volumes);
+    volumeList.add(newVolume);
+    volumes = Collections.unmodifiableList(volumeList);
+    FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
+  }
 
   void addBlockPool(final String bpid, final Configuration conf) throws IOException {
     long totalStartTime = Time.monotonicNow();

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Aug 20 01:34:29 2014
@@ -764,8 +764,6 @@ public class FSDirectory implements Clos
     checkSnapshot(srcInode, null);
   }
 
-
-
   private class RenameOperation {
     private final INodesInPath srcIIP;
     private final INodesInPath dstIIP;
@@ -798,7 +796,7 @@ public class FSDirectory implements Clos
       // snapshot is taken on the dst tree, changes will be recorded in the latest
       // snapshot of the src tree.
       if (isSrcInSnapshot) {
-        srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
+        srcChild.recordModification(srcIIP.getLatestSnapshotId());
       }
 
       // check srcChild for reference
@@ -928,8 +926,7 @@ public class FSDirectory implements Clos
       updateCount(iip, 0, dsDelta, true);
     }
 
-    file = file.setFileReplication(replication, iip.getLatestSnapshotId(),
-        inodeMap);
+    file.setFileReplication(replication, iip.getLatestSnapshotId());
     
     final short newBR = file.getBlockReplication(); 
     // check newBR < oldBR case. 
@@ -1081,9 +1078,6 @@ public class FSDirectory implements Clos
       count++;
     }
     
-    // update inodeMap
-    removeFromInodeMap(Arrays.asList(allSrcInodes));
-    
     trgInode.setModificationTime(timestamp, trgLatestSnapshot);
     trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
     // update quota on the parent directory ('count' files removed, 0 space)
@@ -1215,8 +1209,7 @@ public class FSDirectory implements Clos
 
     // record modification
     final int latestSnapshot = iip.getLatestSnapshotId();
-    targetNode = targetNode.recordModification(latestSnapshot);
-    iip.setLastINode(targetNode);
+    targetNode.recordModification(latestSnapshot);
 
     // Remove the node from the namespace
     long removed = removeLastINode(iip);
@@ -2125,7 +2118,7 @@ public class FSDirectory implements Clos
       }
 
       final int latest = iip.getLatestSnapshotId();
-      dirNode = dirNode.recordModification(latest);
+      dirNode.recordModification(latest);
       dirNode.setQuota(nsQuota, dsQuota);
       return dirNode;
     }

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Aug 20 01:34:29 2014
@@ -62,6 +62,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
@@ -83,9 +85,6 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
-
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.BufferedWriter;
@@ -231,6 +230,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -2516,7 +2516,7 @@ public class FSNamesystem implements Nam
                                    boolean writeToEditLog,
                                    int latestSnapshot, boolean logRetryCache)
       throws IOException {
-    file = file.recordModification(latestSnapshot);
+    file.recordModification(latestSnapshot);
     final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine);
 
     leaseManager.addLease(cons.getFileUnderConstructionFeature()
@@ -3720,8 +3720,10 @@ public class FSNamesystem implements Nam
       StandbyException, IOException {
     FSPermissionChecker pc = getPermissionChecker();  
     checkOperation(OperationCategory.READ);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
         checkTraverse(pc, src);
@@ -4209,7 +4211,7 @@ public class FSNamesystem implements Nam
     Preconditions.checkArgument(uc != null);
     leaseManager.removeLease(uc.getClientName(), src);
     
-    pendingFile = pendingFile.recordModification(latestSnapshot);
+    pendingFile.recordModification(latestSnapshot);
 
     // The file is no longer pending.
     // Create permanent INode, update blocks. No need to replace the inode here
@@ -4298,7 +4300,30 @@ public class FSNamesystem implements Nam
           throw new IOException("Block (=" + lastblock + ") not found");
         }
       }
-      INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
+      //
+      // The implementation of delete operation (see @deleteInternal method)
+      // first removes the file paths from namespace, and delays the removal
+      // of blocks to later time for better performance. When
+      // commitBlockSynchronization (this method) is called in between, the
+      // blockCollection of storedBlock could have been assigned to null by
+      // the delete operation, throw IOException here instead of NPE; if the
+      // file path is already removed from namespace by the delete operation,
+      // throw FileNotFoundException here, so not to proceed to the end of
+      // this method to add a CloseOp to the edit log for an already deleted
+      // file (See HDFS-6825).
+      //
+      BlockCollection blockCollection = storedBlock.getBlockCollection();
+      if (blockCollection == null) {
+        throw new IOException("The blockCollection of " + storedBlock
+            + " is null, likely because the file owning this block was"
+            + " deleted and the block removal is delayed");
+      }
+      INodeFile iFile = ((INode)blockCollection).asFile();
+      if (isFileDeleted(iFile)) {
+        throw new FileNotFoundException("File not found: "
+            + iFile.getFullPathName() + ", likely due to delayed block"
+            + " removal");
+      }
       if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Unexpected block (=" + lastblock
@@ -4353,8 +4378,11 @@ public class FSNamesystem implements Nam
           // Otherwise fsck will report these blocks as MISSING, especially if the
           // blocksReceived from Datanodes take a long time to arrive.
           for (int i = 0; i < trimmedTargets.size(); i++) {
-            trimmedTargets.get(i).addBlock(
-              trimmedStorages.get(i), storedBlock);
+            DatanodeStorageInfo storageInfo =
+                trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
+            if (storageInfo != null) {
+              storageInfo.addBlock(storedBlock);
+            }
           }
         }
 
@@ -4914,6 +4942,28 @@ public class FSNamesystem implements Nam
     }
   }
 
+  DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type
+      ) throws AccessControlException, StandbyException {
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.UNCHECKED);
+    readLock();
+    try {
+      checkOperation(OperationCategory.UNCHECKED);
+      final DatanodeManager dm = getBlockManager().getDatanodeManager();      
+      final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type);
+
+      DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes.size()];
+      for (int i = 0; i < reports.length; i++) {
+        final DatanodeDescriptor d = datanodes.get(i);
+        reports[i] = new DatanodeStorageReport(new DatanodeInfo(d),
+            d.getStorageReports());
+      }
+      return reports;
+    } finally {
+      readUnlock();
+    }
+  }
+
   /**
    * Save namespace image.
    * This will save current namespace into fsimage file and empty edits file.
@@ -5811,7 +5861,7 @@ public class FSNamesystem implements Nam
   }
 
   public void processIncrementalBlockReport(final DatanodeID nodeID,
-      final String poolId, final StorageReceivedDeletedBlocks srdb)
+      final StorageReceivedDeletedBlocks srdb)
       throws IOException {
     writeLock();
     try {
@@ -6061,7 +6111,6 @@ public class FSNamesystem implements Nam
       blockManager.shutdown();
     }
   }
-  
 
   @Override // FSNamesystemMBean
   public int getNumLiveDataNodes() {
@@ -6109,6 +6158,15 @@ public class FSNamesystem implements Nam
   }
 
   /**
+   * Storages are marked as "content stale" after NN restart or fails over and
+   * before NN receives the first Heartbeat followed by the first Blockreport.
+   */
+  @Override // FSNamesystemMBean
+  public int getNumStaleStorages() {
+    return getBlockManager().getDatanodeManager().getNumStaleStorages();
+  }
+
+  /**
    * Sets the current generation stamp for legacy blocks
    */
   void setGenerationStampV1(long stamp) {
@@ -6262,9 +6320,28 @@ public class FSNamesystem implements Nam
 
   private boolean isFileDeleted(INodeFile file) {
     // Not in the inodeMap or in the snapshot but marked deleted.
-    if (dir.getInode(file.getId()) == null || 
-        file.getParent() == null || (file.isWithSnapshot() &&
-        file.getFileWithSnapshotFeature().isCurrentFileDeleted())) {
+    if (dir.getInode(file.getId()) == null) {
+      return true;
+    }
+
+    // look at the path hierarchy to see if one parent is deleted by recursive
+    // deletion
+    INode tmpChild = file;
+    INodeDirectory tmpParent = file.getParent();
+    while (true) {
+      if (tmpParent == null ||
+          tmpParent.searchChildren(tmpChild.getLocalNameBytes()) < 0) {
+        return true;
+      }
+      if (tmpParent.isRoot()) {
+        break;
+      }
+      tmpChild = tmpParent;
+      tmpParent = tmpParent.getParent();
+    }
+
+    if (file.isWithSnapshot() &&
+        file.getFileWithSnapshotFeature().isCurrentFileDeleted()) {
       return true;
     }
     return false;
@@ -8183,9 +8260,11 @@ public class FSNamesystem implements Nam
     nnConf.checkAclsConfigFlag();
     FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
       checkOperation(OperationCategory.READ);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
       if (isPermissionEnabled) {
         checkPermission(pc, src, false, null, null, null, null);
       }
@@ -8288,8 +8367,10 @@ public class FSNamesystem implements Nam
       }
     }
     checkOperation(OperationCategory.READ);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
         checkPathAccess(pc, src, FsAction.READ);
@@ -8333,8 +8414,10 @@ public class FSNamesystem implements Nam
     nnConf.checkXAttrsConfigFlag();
     final FSPermissionChecker pc = getPermissionChecker();
     checkOperation(OperationCategory.READ);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     readLock();
     try {
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
       checkOperation(OperationCategory.READ);
       if (isPermissionEnabled) {
         /* To access xattr names, you need EXECUTE in the owning directory. */
@@ -8428,6 +8511,29 @@ public class FSNamesystem implements Nam
     }
   }
 
+  void checkAccess(String src, FsAction mode) throws AccessControlException,
+      FileNotFoundException, UnresolvedLinkException, IOException {
+    checkOperation(OperationCategory.READ);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      if (dir.getINode(src) == null) {
+        throw new FileNotFoundException("Path not found");
+      }
+      if (isPermissionEnabled) {
+        FSPermissionChecker pc = getPermissionChecker();
+        checkPathAccess(pc, src, mode);
+      }
+    } catch (AccessControlException e) {
+      logAuditEvent(false, "checkAccess", src);
+      throw e;
+    } finally {
+      readUnlock();
+    }
+  }
+
   /**
    * Default AuditLogger implementation; used when no access logger is
    * defined in the config file. It can also be explicitly listed in the

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Aug 20 01:34:29 2014
@@ -97,9 +97,9 @@ public abstract class INode implements I
   /** Set user */
   final INode setUser(String user, int latestSnapshotId)
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latestSnapshotId);
-    nodeToUpdate.setUser(user);
-    return nodeToUpdate;
+    recordModification(latestSnapshotId);
+    setUser(user);
+    return this;
   }
   /**
    * @param snapshotId
@@ -122,9 +122,9 @@ public abstract class INode implements I
   /** Set group */
   final INode setGroup(String group, int latestSnapshotId)
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latestSnapshotId);
-    nodeToUpdate.setGroup(group);
-    return nodeToUpdate;
+    recordModification(latestSnapshotId);
+    setGroup(group);
+    return this;
   }
 
   /**
@@ -148,9 +148,9 @@ public abstract class INode implements I
   /** Set the {@link FsPermission} of this {@link INode} */
   INode setPermission(FsPermission permission, int latestSnapshotId) 
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latestSnapshotId);
-    nodeToUpdate.setPermission(permission);
-    return nodeToUpdate;
+    recordModification(latestSnapshotId);
+    setPermission(permission);
+    return this;
   }
 
   abstract AclFeature getAclFeature(int snapshotId);
@@ -164,18 +164,18 @@ public abstract class INode implements I
 
   final INode addAclFeature(AclFeature aclFeature, int latestSnapshotId)
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latestSnapshotId);
-    nodeToUpdate.addAclFeature(aclFeature);
-    return nodeToUpdate;
+    recordModification(latestSnapshotId);
+    addAclFeature(aclFeature);
+    return this;
   }
 
   abstract void removeAclFeature();
 
   final INode removeAclFeature(int latestSnapshotId)
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latestSnapshotId);
-    nodeToUpdate.removeAclFeature();
-    return nodeToUpdate;
+    recordModification(latestSnapshotId);
+    removeAclFeature();
+    return this;
   }
 
   /**
@@ -199,9 +199,9 @@ public abstract class INode implements I
   
   final INode addXAttrFeature(XAttrFeature xAttrFeature, int latestSnapshotId) 
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latestSnapshotId);
-    nodeToUpdate.addXAttrFeature(xAttrFeature);
-    return nodeToUpdate;
+    recordModification(latestSnapshotId);
+    addXAttrFeature(xAttrFeature);
+    return this;
   }
   
   /**
@@ -211,9 +211,9 @@ public abstract class INode implements I
   
   final INode removeXAttrFeature(int lastestSnapshotId)
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(lastestSnapshotId);
-    nodeToUpdate.removeXAttrFeature();
-    return nodeToUpdate;
+    recordModification(lastestSnapshotId);
+    removeXAttrFeature();
+    return this;
   }
   
   /**
@@ -298,11 +298,8 @@ public abstract class INode implements I
    * @param latestSnapshotId The id of the latest snapshot that has been taken.
    *                         Note that it is {@link Snapshot#CURRENT_STATE_ID} 
    *                         if no snapshots have been taken.
-   * @return The current inode, which usually is the same object of this inode.
-   *         However, in some cases, this inode may be replaced with a new inode
-   *         for maintaining snapshots. The current inode is then the new inode.
    */
-  abstract INode recordModification(final int latestSnapshotId)
+  abstract void recordModification(final int latestSnapshotId)
       throws QuotaExceededException;
 
   /** Check whether it's a reference. */
@@ -652,9 +649,9 @@ public abstract class INode implements I
   /** Set the last modification time of inode. */
   public final INode setModificationTime(long modificationTime,
       int latestSnapshotId) throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latestSnapshotId);
-    nodeToUpdate.setModificationTime(modificationTime);
-    return nodeToUpdate;
+    recordModification(latestSnapshotId);
+    setModificationTime(modificationTime);
+    return this;
   }
 
   /**
@@ -682,9 +679,9 @@ public abstract class INode implements I
    */
   public final INode setAccessTime(long accessTime, int latestSnapshotId)
       throws QuotaExceededException {
-    final INode nodeToUpdate = recordModification(latestSnapshotId);
-    nodeToUpdate.setAccessTime(accessTime);
-    return nodeToUpdate;
+    recordModification(latestSnapshotId);
+    setAccessTime(accessTime);
+    return this;
   }
 
 

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Wed Aug 20 01:34:29 2014
@@ -157,7 +157,7 @@ public class INodeDirectory extends INod
     return quota;
   }
 
-  private int searchChildren(byte[] name) {
+  int searchChildren(byte[] name) {
     return children == null? -1: Collections.binarySearch(children, name);
   }
   
@@ -318,7 +318,7 @@ public class INodeDirectory extends INod
   }
 
   @Override
-  public INodeDirectory recordModification(int latestSnapshotId) 
+  public void recordModification(int latestSnapshotId)
       throws QuotaExceededException {
     if (isInLatestSnapshot(latestSnapshotId)
         && !shouldRecordInSrcSnapshot(latestSnapshotId)) {
@@ -330,7 +330,6 @@ public class INodeDirectory extends INod
       // record self in the diff list if necessary
       sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null);
     }
-    return this;
   }
 
   /**

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Aug 20 01:34:29 2014
@@ -284,7 +284,7 @@ public class INodeFile extends INodeWith
   }
 
   @Override
-  public INodeFile recordModification(final int latestSnapshotId) 
+  public void recordModification(final int latestSnapshotId)
       throws QuotaExceededException {
     if (isInLatestSnapshot(latestSnapshotId)
         && !shouldRecordInSrcSnapshot(latestSnapshotId)) {
@@ -296,7 +296,6 @@ public class INodeFile extends INodeWith
       // record self in the diff list if necessary
       sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null);
     }
-    return this;
   }
   
   public FileDiffList getDiffs() {
@@ -344,11 +343,10 @@ public class INodeFile extends INodeWith
 
   /** Set the replication factor of this file. */
   public final INodeFile setFileReplication(short replication,
-      int latestSnapshotId, final INodeMap inodeMap)
-      throws QuotaExceededException {
-    final INodeFile nodeToUpdate = recordModification(latestSnapshotId);
-    nodeToUpdate.setFileReplication(replication);
-    return nodeToUpdate;
+      int latestSnapshotId) throws QuotaExceededException {
+    recordModification(latestSnapshotId);
+    setFileReplication(replication);
+    return this;
   }
 
   /** @return preferred block size (in bytes) of the file. */

Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java Wed Aug 20 01:34:29 2014
@@ -93,9 +93,8 @@ public class INodeMap {
         "", "", new FsPermission((short) 0)), 0, 0) {
       
       @Override
-      INode recordModification(int latestSnapshotId)
+      void recordModification(int latestSnapshotId)
           throws QuotaExceededException {
-        return null;
       }
       
       @Override