You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [14/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Aug 19 23:49:39 2014
@@ -18,13 +18,19 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -35,13 +41,31 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /** 
  * Data storage information file.
@@ -149,43 +173,99 @@ public class DataStorage extends Storage
   }
   
   /**
-   * Analyze storage directories.
-   * Recover from previous transitions if required. 
-   * Perform fs state transition if necessary depending on the namespace info.
-   * Read storage info.
-   * <br>
-   * This method should be synchronized between multiple DN threads.  Only the 
-   * first DN thread does DN level storage dir recoverTransitionRead.
-   * 
+   * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
+   */
+  private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
+    this.layoutVersion = getServiceLayoutVersion();
+    for (StorageDirectory dir : dirs) {
+      writeProperties(dir);
+    }
+  }
+
+  /**
+   * Add a list of volumes to be managed by DataStorage. If the volume is empty,
+   * format it, otherwise recover it from previous transitions if required.
+   *
+   * @param datanode the reference to DataNode.
    * @param nsInfo namespace information
    * @param dataDirs array of data storage directories
    * @param startOpt startup option
    * @throws IOException
    */
-  synchronized void recoverTransitionRead(DataNode datanode,
+  synchronized void addStorageLocations(DataNode datanode,
       NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
       StartupOption startOpt)
       throws IOException {
-    if (initialized) {
-      // DN storage has been initialized, no need to do anything
-      return;
+    // Similar to recoverTransitionRead, it first ensures the datanode level
+    // format is completed.
+    List<StorageLocation> tmpDataDirs =
+        new ArrayList<StorageLocation>(dataDirs);
+    addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
+
+    Collection<File> bpDataDirs = new ArrayList<File>();
+    String bpid = nsInfo.getBlockPoolID();
+    for (StorageLocation dir : dataDirs) {
+      File dnRoot = dir.getFile();
+      File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
+          STORAGE_DIR_CURRENT));
+      bpDataDirs.add(bpRoot);
     }
-    LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
-        + " and name-node layout version: " + nsInfo.getLayoutVersion());
-    
-    // 1. For each data directory calculate its state and 
-    // check whether all is consistent before transitioning.
-    // Format and recover.
-    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
-    ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
+    // mkdir for the list of BlockPoolStorage
+    makeBlockPoolDataDir(bpDataDirs, null);
+    BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
+    if (bpStorage == null) {
+      bpStorage = new BlockPoolSliceStorage(
+          nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+          nsInfo.getClusterID());
+    }
+
+    bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
+    addBlockPoolStorage(bpid, bpStorage);
+  }
+
+  /**
+   * Add a list of volumes to be managed by this DataStorage. If the volume is
+   * empty, it formats the volume, otherwise it recovers it from previous
+   * transitions if required.
+   *
+   * If isInitialize is false, only the directories that have finished the
+   * doTransition() process will be added into DataStorage.
+   *
+   * @param datanode the reference to DataNode.
+   * @param nsInfo namespace information
+   * @param dataDirs array of data storage directories
+   * @param startOpt startup option
+   * @param isInitialize whether it is called when DataNode starts up.
+   * @throws IOException
+   */
+  private synchronized void addStorageLocations(DataNode datanode,
+      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+      StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
+      throws IOException {
+    Set<String> existingStorageDirs = new HashSet<String>();
+    for (int i = 0; i < getNumStorageDirs(); i++) {
+      existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+    }
+
+    // 1. For each data directory calculate its state and check whether all is
+    // consistent before transitioning. Format and recover.
+    ArrayList<StorageState> dataDirStates =
+        new ArrayList<StorageState>(dataDirs.size());
+    List<StorageDirectory> addedStorageDirectories =
+        new ArrayList<StorageDirectory>();
     for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
       File dataDir = it.next().getFile();
+      if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+        LOG.info("Storage directory " + dataDir + " has already been used.");
+        it.remove();
+        continue;
+      }
       StorageDirectory sd = new StorageDirectory(dataDir);
       StorageState curState;
       try {
         curState = sd.analyzeStorage(startOpt, this);
         // sd is locked but not opened
-        switch(curState) {
+        switch (curState) {
         case NORMAL:
           break;
         case NON_EXISTENT:
@@ -194,7 +274,8 @@ public class DataStorage extends Storage
           it.remove();
           continue;
         case NOT_FORMATTED: // format
-          LOG.info("Storage directory " + dataDir + " is not formatted");
+          LOG.info("Storage directory " + dataDir + " is not formatted for "
+            + nsInfo.getBlockPoolID());
           LOG.info("Formatting ...");
           format(sd, nsInfo, datanode.getDatanodeUuid());
           break;
@@ -208,28 +289,82 @@ public class DataStorage extends Storage
         //continue with other good dirs
         continue;
       }
-      // add to the storage list
-      addStorageDir(sd);
+      if (isInitialize) {
+        addStorageDir(sd);
+      }
+      addedStorageDirectories.add(sd);
       dataDirStates.add(curState);
     }
 
-    if (dataDirs.size() == 0 || dataDirStates.size() == 0)  // none of the data dirs exist
+    if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
+      // none of the data dirs exist
+      if (ignoreExistingDirs) {
+        return;
+      }
       throw new IOException(
           "All specified directories are not accessible or do not exist.");
+    }
 
     // 2. Do transitions
     // Each storage directory is treated individually.
-    // During startup some of them can upgrade or rollback 
-    // while others could be uptodate for the regular startup.
-    for(int idx = 0; idx < getNumStorageDirs(); idx++) {
-      doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
-      createStorageID(getStorageDir(idx));
+    // During startup some of them can upgrade or rollback
+    // while others could be up-to-date for the regular startup.
+    for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
+        it.hasNext(); ) {
+      StorageDirectory sd = it.next();
+      try {
+        doTransition(datanode, sd, nsInfo, startOpt);
+        createStorageID(sd);
+      } catch (IOException e) {
+        if (!isInitialize) {
+          sd.unlock();
+          it.remove();
+          continue;
+        }
+        unlockAll();
+        throw e;
+      }
     }
+
+    // 3. Update all successfully loaded storages. Some of them might have just
+    // been formatted.
+    this.writeAll(addedStorageDirectories);
+
+    // 4. Make newly loaded storage directories visible for service.
+    if (!isInitialize) {
+      this.storageDirs.addAll(addedStorageDirectories);
+    }
+  }
+
+  /**
+   * Analyze storage directories.
+   * Recover from previous transitions if required.
+   * Perform fs state transition if necessary depending on the namespace info.
+   * Read storage info.
+   * <br>
+   * This method should be synchronized between multiple DN threads.  Only the
+   * first DN thread does DN level storage dir recoverTransitionRead.
+   *
+   * @param nsInfo namespace information
+   * @param dataDirs array of data storage directories
+   * @param startOpt startup option
+   * @throws IOException
+   */
+  synchronized void recoverTransitionRead(DataNode datanode,
+      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+      StartupOption startOpt)
+      throws IOException {
+    if (initialized) {
+      // DN storage has been initialized, no need to do anything
+      return;
+    }
+    LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+        + " and NameNode layout version: " + nsInfo.getLayoutVersion());
+
+    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
+    addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
     
-    // 3. Update all storages. Some of them might have just been formatted.
-    this.writeAll();
-    
-    // 4. mark DN storage is initialized
+    // mark DN storage is initialized
     this.initialized = true;
   }
 
@@ -256,6 +391,7 @@ public class DataStorage extends Storage
           STORAGE_DIR_CURRENT));
       bpDataDirs.add(bpRoot);
     }
+
     // mkdir for the list of BlockPoolStorage
     makeBlockPoolDataDir(bpDataDirs, null);
     BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
@@ -483,7 +619,7 @@ public class DataStorage extends Storage
     
     // do upgrade
     if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
-      doUpgrade(sd, nsInfo);  // upgrade
+      doUpgrade(datanode, sd, nsInfo);  // upgrade
       return;
     }
     
@@ -518,7 +654,8 @@ public class DataStorage extends Storage
    * @param sd  storage directory
    * @throws IOException on error
    */
-  void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+  void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo)
+      throws IOException {
     // If the existing on-disk layout version supportes federation, simply
     // update its layout version.
     if (DataNodeLayoutVersion.supports(
@@ -563,7 +700,8 @@ public class DataStorage extends Storage
     BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), 
         nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
     bpStorage.format(curDir, nsInfo);
-    linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
+    linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
+        STORAGE_DIR_CURRENT));
     
     // 4. Write version file under <SD>/current
     layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
@@ -741,22 +879,22 @@ public class DataStorage extends Storage
    *
    * @throws IOException If error occurs during hardlink
    */
-  private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
-      throws IOException {
+  private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
+      File toDir) throws IOException {
     HardLink hardLink = new HardLink();
     // do the link
     int diskLayoutVersion = this.getLayoutVersion();
     if (DataNodeLayoutVersion.supports(
         LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
       // hardlink finalized blocks in tmpDir/finalized
-      linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), 
+      linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
           new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
       // hardlink rbw blocks in tmpDir/rbw
-      linkBlocks(new File(fromDir, STORAGE_DIR_RBW), 
+      linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
           new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
     } else { // pre-RBW version
       // hardlink finalized blocks in tmpDir
-      linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED), 
+      linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
           diskLayoutVersion, hardLink);      
       if (fromBbwDir.exists()) {
         /*
@@ -765,15 +903,67 @@ public class DataStorage extends Storage
          * NOT underneath the 'current' directory in those releases.  See
          * HDFS-3731 for details.
          */
-        linkBlocks(fromBbwDir,
+        linkBlocks(datanode, fromBbwDir,
             new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
       }
     } 
     LOG.info( hardLink.linkStats.report() );
   }
+
+  private static class LinkArgs {
+    public File src;
+    public File dst;
+
+    public LinkArgs(File src, File dst) {
+      this.src = src;
+      this.dst = dst;
+    }
+  }
+
+  static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
+      HardLink hl) throws IOException {
+    boolean upgradeToIdBasedLayout = false;
+    // If we are upgrading from a version older than the one where we introduced
+    // block ID-based layout AND we're working with the finalized directory,
+    // we'll need to upgrade from the old flat layout to the block ID-based one
+    if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
+        getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
+      upgradeToIdBasedLayout = true;
+    }
+
+    final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
+    linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
+        idBasedLayoutSingleLinks);
+    int numLinkWorkers = datanode.getConf().getInt(
+        DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
+        DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
+    ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
+    final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
+    List<Future<Void>> futures = Lists.newArrayList();
+    for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
+      final int iCopy = i;
+      futures.add(linkWorkers.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          int upperBound = Math.min(iCopy + step,
+              idBasedLayoutSingleLinks.size());
+          for (int j = iCopy; j < upperBound; j++) {
+            LinkArgs cur = idBasedLayoutSingleLinks.get(j);
+            NativeIO.link(cur.src, cur.dst);
+          }
+          return null;
+        }
+      }));
+    }
+    linkWorkers.shutdown();
+    for (Future<Void> f : futures) {
+      Futures.get(f, IOException.class);
+    }
+  }
   
-  static void linkBlocks(File from, File to, int oldLV, HardLink hl) 
-  throws IOException {
+  static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl,
+  boolean upgradeToIdBasedLayout, File blockRoot,
+      List<LinkArgs> idBasedLayoutSingleLinks) throws IOException {
     if (!from.exists()) {
       return;
     }
@@ -800,9 +990,6 @@ public class DataStorage extends Storage
     // from is a directory
     hl.linkStats.countDirs++;
     
-    if (!to.mkdirs())
-      throw new IOException("Cannot create directory " + to);
-    
     String[] blockNames = from.list(new java.io.FilenameFilter() {
       @Override
       public boolean accept(File dir, String name) {
@@ -810,12 +997,36 @@ public class DataStorage extends Storage
       }
     });
 
+    // If we are upgrading to block ID-based layout, we don't want to recreate
+    // any subdirs from the source that contain blocks, since we have a new
+    // directory structure
+    if (!upgradeToIdBasedLayout || !to.getName().startsWith(
+        BLOCK_SUBDIR_PREFIX)) {
+      if (!to.mkdirs())
+        throw new IOException("Cannot create directory " + to);
+    }
+
     // Block files just need hard links with the same file names
     // but a different directory
     if (blockNames.length > 0) {
-      HardLink.createHardLinkMult(from, blockNames, to);
-      hl.linkStats.countMultLinks++;
-      hl.linkStats.countFilesMultLinks += blockNames.length;
+      if (upgradeToIdBasedLayout) {
+        for (String blockName : blockNames) {
+          long blockId = Block.getBlockId(blockName);
+          File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId);
+          if (!blockLocation.exists()) {
+            if (!blockLocation.mkdirs()) {
+              throw new IOException("Failed to mkdirs " + blockLocation);
+            }
+          }
+          idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName),
+              new File(blockLocation, blockName)));
+          hl.linkStats.countSingleLinks++;
+        }
+      } else {
+        HardLink.createHardLinkMult(from, blockNames, to);
+        hl.linkStats.countMultLinks++;
+        hl.linkStats.countFilesMultLinks += blockNames.length;
+      }
     } else {
       hl.linkStats.countEmptyDirs++;
     }
@@ -829,8 +1040,9 @@ public class DataStorage extends Storage
         }
       });
     for(int i = 0; i < otherNames.length; i++)
-      linkBlocks(new File(from, otherNames[i]), 
-          new File(to, otherNames[i]), oldLV, hl);
+      linkBlocksHelper(new File(from, otherNames[i]),
+          new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout,
+          blockRoot, idBasedLayoutSingleLinks);
   }
 
   /**

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Aug 19 23:49:39 2014
@@ -36,28 +36,27 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
-import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
+import java.security.MessageDigest;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
@@ -83,7 +82,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.net.InetAddresses;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 
 
@@ -104,7 +103,8 @@ class DataXceiver extends Receiver imple
   private long opStartTime; //the start time of receiving an Op
   private final InputStream socketIn;
   private OutputStream socketOut;
-
+  private BlockReceiver blockReceiver = null;
+  
   /**
    * Client Name used in previous operation. Not available on first request
    * on the socket.
@@ -160,6 +160,12 @@ class DataXceiver extends Receiver imple
     return socketOut;
   }
 
+  public void sendOOB() throws IOException, InterruptedException {
+    LOG.info("Sending OOB to peer: " + peer);
+    if(blockReceiver!=null)
+      blockReceiver.sendOOB();
+  }
+  
   /**
    * Read/write data from/to the DataXceiverServer.
    */
@@ -169,27 +175,14 @@ class DataXceiver extends Receiver imple
     Op op = null;
 
     try {
-      dataXceiverServer.addPeer(peer, Thread.currentThread());
+      dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
-      if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
-          !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
-        IOStreamPair encryptedStreams = null;
-        try {
-          encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
-              socketIn, datanode.blockPoolTokenSecretManager,
-              dnConf.encryptionAlgorithm);
-        } catch (InvalidMagicNumberException imne) {
-          LOG.info("Failed to read expected encryption handshake from client " +
-              "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
-              "is running an older version of Hadoop which does not support " +
-              "encryption");
-          return;
-        }
-        input = encryptedStreams.in;
-        socketOut = encryptedStreams.out;
-      }
-      input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+      IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
+        socketIn, datanode.getDatanodeId());
+      input = new BufferedInputStream(saslStreams.in,
+        HdfsConstants.SMALL_BUFFER_SIZE);
+      socketOut = saslStreams.out;
       
       super.initialize(new DataInputStream(input));
       
@@ -261,19 +254,6 @@ class DataXceiver extends Receiver imple
       }
     }
   }
-  
-  /**
-   * Returns InetAddress from peer
-   * The getRemoteAddressString is the form  /ip-address:port
-   * The ip-address is extracted from peer and InetAddress is formed
-   * @param peer
-   * @return
-   * @throws UnknownHostException
-   */
-  private static InetAddress getClientAddress(Peer peer) {
-    return InetAddresses.forString(
-        peer.getRemoteAddressString().split(":")[0].substring(1));
-  }
 
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
@@ -552,9 +532,11 @@ class DataXceiver extends Receiver imple
 
   @Override
   public void writeBlock(final ExtendedBlock block,
+      final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String clientname,
       final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes, 
       final DatanodeInfo srcDataNode,
       final BlockConstructionStage stage,
       final int pipelineSize,
@@ -609,7 +591,6 @@ class DataXceiver extends Receiver imple
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
     Socket mirrorSock = null;           // socket to next target
-    BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
     Status mirrorInStatus = SUCCESS;
@@ -618,12 +599,13 @@ class DataXceiver extends Receiver imple
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
-        blockReceiver = new BlockReceiver(block, in, 
+        blockReceiver = new BlockReceiver(block, storageType, in,
             peer.getRemoteAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
             cachingStrategy);
+        
         storageUuid = blockReceiver.getStorageUuid();
       } else {
         storageUuid = datanode.data.recoverClose(
@@ -654,25 +636,20 @@ class DataXceiver extends Receiver imple
           OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
               writeTimeout);
           InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
-          if (dnConf.encryptDataTransfer &&
-              !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
-            IOStreamPair encryptedStreams =
-                DataTransferEncryptor.getEncryptedStreams(
-                    unbufMirrorOut, unbufMirrorIn,
-                    datanode.blockPoolTokenSecretManager
-                        .generateDataEncryptionKey(block.getBlockPoolId()));
-            
-            unbufMirrorOut = encryptedStreams.out;
-            unbufMirrorIn = encryptedStreams.in;
-          }
+          DataEncryptionKeyFactory keyFactory =
+            datanode.getDataEncryptionKeyFactoryForBlock(block);
+          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
+            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+          unbufMirrorOut = saslStreams.out;
+          unbufMirrorIn = saslStreams.in;
           mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
               HdfsConstants.SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(unbufMirrorIn);
 
-          new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
-              clientname, targets, srcDataNode, stage, pipelineSize,
-              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
-              cachingStrategy);
+          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+              latestGenerationStamp, requestedChecksum, cachingStrategy);
 
           mirrorOut.flush();
 
@@ -737,7 +714,7 @@ class DataXceiver extends Receiver imple
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets);
+            mirrorAddr, null, targets, false);
 
         // send close-ack for transfer-RBW/Finalized 
         if (isTransfer) {
@@ -776,6 +753,7 @@ class DataXceiver extends Receiver imple
       IOUtils.closeStream(replyOut);
       IOUtils.closeSocket(mirrorSock);
       IOUtils.closeStream(blockReceiver);
+      blockReceiver = null;
     }
 
     //update metrics
@@ -787,7 +765,8 @@ class DataXceiver extends Receiver imple
   public void transferBlock(final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> blockToken,
       final String clientName,
-      final DatanodeInfo[] targets) throws IOException {
+      final DatanodeInfo[] targets,
+      final StorageType[] targetStorageTypes) throws IOException {
     checkAccess(socketOut, true, blk, blockToken,
         Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
     previousOpClientName = clientName;
@@ -796,13 +775,51 @@ class DataXceiver extends Receiver imple
     final DataOutputStream out = new DataOutputStream(
         getOutputStream());
     try {
-      datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
+      datanode.transferReplicaForPipelineRecovery(blk, targets,
+          targetStorageTypes, clientName);
       writeResponse(Status.SUCCESS, null, out);
     } finally {
       IOUtils.closeStream(out);
     }
   }
-  
+
+  private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
+      long requestLength, DataChecksum checksum, DataInputStream checksumIn)
+      throws IOException {
+    final int bytesPerCRC = checksum.getBytesPerChecksum();
+    final int csize = checksum.getChecksumSize();
+    final byte[] buffer = new byte[4*1024];
+    MessageDigest digester = MD5Hash.getDigester();
+
+    long remaining = requestLength / bytesPerCRC * csize;
+    for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
+      toDigest = checksumIn.read(buffer, 0,
+          (int) Math.min(remaining, buffer.length));
+      if (toDigest < 0) {
+        break;
+      }
+      digester.update(buffer, 0, toDigest);
+    }
+    
+    int partialLength = (int) (requestLength % bytesPerCRC);
+    if (partialLength > 0) {
+      byte[] buf = new byte[partialLength];
+      final InputStream blockIn = datanode.data.getBlockInputStream(block,
+          requestLength - partialLength);
+      try {
+        // Get the CRC of the partialLength.
+        IOUtils.readFully(blockIn, buf, 0, partialLength);
+      } finally {
+        IOUtils.closeStream(blockIn);
+      }
+      checksum.update(buf, 0, partialLength);
+      byte[] partialCrc = new byte[csize];
+      checksum.writeValue(partialCrc, 0, true);
+      digester.update(partialCrc);
+    }
+    return new MD5Hash(digester.digest());
+  }
+
   @Override
   public void blockChecksum(final ExtendedBlock block,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
@@ -810,25 +827,32 @@ class DataXceiver extends Receiver imple
         getOutputStream());
     checkAccess(out, true, block, blockToken,
         Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
-    updateCurrentThreadName("Reading metadata for block " + block);
-    final LengthInputStream metadataIn = 
-      datanode.data.getMetaDataInputStream(block);
-    final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
-        metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
+    // client side now can specify a range of the block for checksum
+    long requestLength = block.getNumBytes();
+    Preconditions.checkArgument(requestLength >= 0);
+    long visibleLength = datanode.data.getReplicaVisibleLength(block);
+    boolean partialBlk = requestLength < visibleLength;
 
+    updateCurrentThreadName("Reading metadata for block " + block);
+    final LengthInputStream metadataIn = datanode.data
+        .getMetaDataInputStream(block);
+    
+    final DataInputStream checksumIn = new DataInputStream(
+        new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
     updateCurrentThreadName("Getting checksum for block " + block);
     try {
       //read metadata file
-      final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-      final DataChecksum checksum = header.getChecksum(); 
+      final BlockMetadataHeader header = BlockMetadataHeader
+          .readHeader(checksumIn);
+      final DataChecksum checksum = header.getChecksum();
+      final int csize = checksum.getChecksumSize();
       final int bytesPerCRC = checksum.getBytesPerChecksum();
-      final long crcPerBlock = checksum.getChecksumSize() > 0 
-              ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize()
-              : 0;
-      
-      //compute block checksum
-      final MD5Hash md5 = MD5Hash.digest(checksumIn);
+      final long crcPerBlock = csize <= 0 ? 0 : 
+        (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
 
+      final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? 
+          calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
+            : MD5Hash.digest(checksumIn);
       if (LOG.isDebugEnabled()) {
         LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
             + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
@@ -841,8 +865,7 @@ class DataXceiver extends Receiver imple
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
-          .setCrcType(PBHelper.convert(checksum.getChecksumType()))
-          )
+          .setCrcType(PBHelper.convert(checksum.getChecksumType())))
         .build()
         .writeDelimitedTo(out);
       out.flush();
@@ -931,6 +954,7 @@ class DataXceiver extends Receiver imple
 
   @Override
   public void replaceBlock(final ExtendedBlock block,
+      final StorageType storageType, 
       final Token<BlockTokenIdentifier> blockToken,
       final String delHint,
       final DatanodeInfo proxySource) throws IOException {
@@ -966,7 +990,7 @@ class DataXceiver extends Receiver imple
     String errMsg = null;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
-    
+    DataOutputStream replyOut = new DataOutputStream(getOutputStream());
     try {
       // get the output stream to the proxy
       final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@@ -981,17 +1005,12 @@ class DataXceiver extends Receiver imple
       OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
           dnConf.socketWriteTimeout);
       InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
-      if (dnConf.encryptDataTransfer && 
-          !dnConf.trustedChannelResolver.isTrusted(
-              proxySock.getInetAddress())) {
-        IOStreamPair encryptedStreams =
-            DataTransferEncryptor.getEncryptedStreams(
-                unbufProxyOut, unbufProxyIn,
-                datanode.blockPoolTokenSecretManager
-                    .generateDataEncryptionKey(block.getBlockPoolId()));
-        unbufProxyOut = encryptedStreams.out;
-        unbufProxyIn = encryptedStreams.in;
-      }
+      DataEncryptionKeyFactory keyFactory =
+        datanode.getDataEncryptionKeyFactoryForBlock(block);
+      IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+        unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+      unbufProxyOut = saslStreams.out;
+      unbufProxyIn = saslStreams.in;
       
       proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
           HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1021,15 +1040,15 @@ class DataXceiver extends Receiver imple
       DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
           checksumInfo.getChecksum());
       // open a block receiver and check if the block does not exist
-      blockReceiver = new BlockReceiver(
-          block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
+      blockReceiver = new BlockReceiver(block, storageType,
+          proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
           null, 0, 0, 0, "", null, datanode, remoteChecksum,
           CachingStrategy.newDropBehind());
 
       // receive a block
-      blockReceiver.receiveBlock(null, null, null, null, 
-          dataXceiverServer.balanceThrottler, null);
+      blockReceiver.receiveBlock(null, null, replyOut, null, 
+          dataXceiverServer.balanceThrottler, null, true);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(
@@ -1064,6 +1083,7 @@ class DataXceiver extends Receiver imple
       IOUtils.closeStream(proxyOut);
       IOUtils.closeStream(blockReceiver);
       IOUtils.closeStream(proxyReply);
+      IOUtils.closeStream(replyOut);
     }
 
     //update metrics
@@ -1077,7 +1097,7 @@ class DataXceiver extends Receiver imple
   /**
    * Utility function for sending a response.
    * 
-   * @param opStatus status message to write
+   * @param status status message to write
    * @param message message to send to the client or other DN
    */
   private void sendResponse(Status status,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Aug 19 23:49:39 2014
@@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.PeerServer;
-import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Server used for receiving/sending a block of data.
@@ -45,6 +45,7 @@ class DataXceiverServer implements Runna
   private final PeerServer peerServer;
   private final DataNode datanode;
   private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
+  private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
   private boolean closed = false;
   
   /**
@@ -63,14 +64,17 @@ class DataXceiverServer implements Runna
    */
   static class BlockBalanceThrottler extends DataTransferThrottler {
    private int numThreads;
+   private int maxThreads;
    
    /**Constructor
     * 
     * @param bandwidth Total amount of bandwidth can be used for balancing 
     */
-   private BlockBalanceThrottler(long bandwidth) {
+   private BlockBalanceThrottler(long bandwidth, int maxThreads) {
      super(bandwidth);
+     this.maxThreads = maxThreads;
      LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
+     LOG.info("Number threads for balancing is "+ maxThreads);
    }
    
    /** Check if the block move can start. 
@@ -79,7 +83,7 @@ class DataXceiverServer implements Runna
     * the counter is incremented; False otherwise.
     */
    synchronized boolean acquire() {
-     if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
+     if (numThreads >= maxThreads) {
        return false;
      }
      numThreads++;
@@ -120,8 +124,10 @@ class DataXceiverServer implements Runna
     
     //set up parameter for cluster balancing
     this.balanceThrottler = new BlockBalanceThrottler(
-      conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 
-                   DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
+        conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+            DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),
+        conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+            DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));
   }
 
   @Override
@@ -212,18 +218,38 @@ class DataXceiverServer implements Runna
     }
   }
   
-  synchronized void addPeer(Peer peer, Thread t) throws IOException {
+  synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
+      throws IOException {
     if (closed) {
       throw new IOException("Server closed.");
     }
     peers.put(peer, t);
+    peersXceiver.put(peer, xceiver);
   }
 
   synchronized void closePeer(Peer peer) {
     peers.remove(peer);
+    peersXceiver.remove(peer);
     IOUtils.cleanup(null, peer);
   }
 
+  // Sending OOB to all peers
+  public synchronized void sendOOBToPeers() {
+    if (!datanode.shutdownForUpgrade) {
+      return;
+    }
+
+    for (Peer p : peers.keySet()) {
+      try {
+        peersXceiver.get(p).sendOOB();
+      } catch (IOException e) {
+        LOG.warn("Got error when sending OOB message.", e);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted when sending OOB message.");
+      }
+    }
+  }
+  
   // Notify all peers of the shutdown and restart.
   // datanode.shouldRun should still be true and datanode.restarting should
   // be set true before calling this method.
@@ -242,6 +268,7 @@ class DataXceiverServer implements Runna
       IOUtils.cleanup(LOG, p);
     }
     peers.clear();
+    peersXceiver.clear();
   }
 
   // Return the number of peers.
@@ -249,7 +276,14 @@ class DataXceiverServer implements Runna
     return peers.size();
   }
 
+  // Return the number of peers and DataXceivers.
+  @VisibleForTesting
+  synchronized int getNumPeersXceiver() {
+    return peersXceiver.size();
+  }
+  
   synchronized void releasePeer(Peer peer) {
     peers.remove(peer);
+    peersXceiver.remove(peer);
   }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,8 @@ public class DatanodeUtil {
 
   public static final String DISK_ERROR = "Possible disk error: ";
 
+  private static final String SEP = System.getProperty("file.separator");
+
   /** Get the cause of an I/O exception if caused by a possible disk error
    * @param ioe an I/O exception
    * @return cause if the I/O exception is caused by a possible disk error;
@@ -78,4 +80,38 @@ public class DatanodeUtil {
   public static File getUnlinkTmpFile(File f) {
     return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
   }
+
+  /**
+   * Checks whether there are any files anywhere in the directory tree rooted
+   * at dir (directories don't count as files). dir must exist
+   * @return true if there are no files
+   * @throws IOException if unable to list subdirectories
+   */
+  public static boolean dirNoFilesRecursive(File dir) throws IOException {
+    File[] contents = dir.listFiles();
+    if (contents == null) {
+      throw new IOException("Cannot list contents of " + dir);
+    }
+    for (File f : contents) {
+      if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get the directory where a finalized block with this ID should be stored.
+   * Do not attempt to create the directory.
+   * @param root the root directory where finalized blocks are stored
+   * @param blockId
+   * @return
+   */
+  public static File idToBlockDir(File root, long blockId) {
+    int d1 = (int)((blockId >> 16) & 0xff);
+    int d2 = (int)((blockId >> 8) & 0xff);
+    String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
+        DataStorage.BLOCK_SUBDIR_PREFIX + d2;
+    return new File(root, path);
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Tue Aug 19 23:49:39 2014
@@ -108,8 +108,7 @@ public class DirectoryScanner implements
     ScanInfoPerBlockPool(int sz) {super(sz);}
     
     /**
-     * Merges "that" ScanInfoPerBlockPool into this one
-     * @param that
+     * Merges {@code that} ScanInfoPerBlockPool into this one
      */
     public void addAll(ScanInfoPerBlockPool that) {
       if (that == null) return;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java Tue Aug 19 23:49:39 2014
@@ -54,7 +54,7 @@ public class FinalizedReplica extends Re
 
   /**
    * Copy constructor.
-   * @param from
+   * @param from where to copy construct from
    */
   public FinalizedReplica(FinalizedReplica from) {
     super(from);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java Tue Aug 19 23:49:39 2014
@@ -68,7 +68,7 @@ public class ReplicaBeingWritten extends
 
   /**
    * Copy constructor.
-   * @param from
+   * @param from where to copy from
    */
   public ReplicaBeingWritten(ReplicaBeingWritten from) {
     super(from);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Tue Aug 19 23:49:39 2014
@@ -89,7 +89,7 @@ public class ReplicaInPipeline extends R
 
   /**
    * Copy constructor.
-   * @param from
+   * @param from where to copy from
    */
   public ReplicaInPipeline(ReplicaInPipeline from) {
     super(from);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java Tue Aug 19 23:49:39 2014
@@ -40,7 +40,7 @@ public interface ReplicaInPipelineInterf
   
   /**
    * Set the number bytes that have acked
-   * @param bytesAcked
+   * @param bytesAcked number bytes acked
    */
   void setBytesAcked(long bytesAcked);
   

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Tue Aug 19 23:49:39 2014
@@ -54,10 +54,10 @@ abstract public class ReplicaInfo extend
   private File baseDir;
   
   /**
-   * Ints representing the sub directory path from base dir to the directory
-   * containing this replica.
+   * Whether or not this replica's parent directory includes subdirs, in which
+   * case we can generate them based on the replica's block ID
    */
-  private int[] subDirs;
+  private boolean hasSubdirs;
   
   private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
 
@@ -100,7 +100,7 @@ abstract public class ReplicaInfo extend
 
   /**
    * Copy constructor.
-   * @param from
+   * @param from where to copy from
    */
   ReplicaInfo(ReplicaInfo from) {
     this(from, from.getVolume(), from.getDir());
@@ -151,18 +151,8 @@ abstract public class ReplicaInfo extend
    * @return the parent directory path where this replica is located
    */
   File getDir() {
-    if (subDirs == null) {
-      return null;
-    }
-
-    StringBuilder sb = new StringBuilder();
-    for (int i : subDirs) {
-      sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
-      sb.append(i);
-      sb.append("/");
-    }
-    File ret = new File(baseDir, sb.toString());
-    return ret;
+    return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
+        getBlockId()) : baseDir;
   }
 
   /**
@@ -175,54 +165,46 @@ abstract public class ReplicaInfo extend
 
   private void setDirInternal(File dir) {
     if (dir == null) {
-      subDirs = null;
       baseDir = null;
       return;
     }
 
-    ReplicaDirInfo replicaDirInfo = parseSubDirs(dir);
-    this.subDirs = replicaDirInfo.subDirs;
+    ReplicaDirInfo dirInfo = parseBaseDir(dir);
+    this.hasSubdirs = dirInfo.hasSubidrs;
     
     synchronized (internedBaseDirs) {
-      if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) {
+      if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
         // Create a new String path of this file and make a brand new File object
         // to guarantee we drop the reference to the underlying char[] storage.
-        File baseDir = new File(replicaDirInfo.baseDirPath);
-        internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir);
+        File baseDir = new File(dirInfo.baseDirPath);
+        internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
       }
-      this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath);
+      this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
     }
   }
-  
+
   @VisibleForTesting
   public static class ReplicaDirInfo {
-    @VisibleForTesting
     public String baseDirPath;
-    
-    @VisibleForTesting
-    public int[] subDirs;
+    public boolean hasSubidrs;
+
+    public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
+      this.baseDirPath = baseDirPath;
+      this.hasSubidrs = hasSubidrs;
+    }
   }
   
   @VisibleForTesting
-  public static ReplicaDirInfo parseSubDirs(File dir) {
-    ReplicaDirInfo ret = new ReplicaDirInfo();
+  public static ReplicaDirInfo parseBaseDir(File dir) {
     
     File currentDir = dir;
-    List<Integer> subDirList = new ArrayList<Integer>();
+    boolean hasSubdirs = false;
     while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
-      // Prepend the integer into the list.
-      subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
-          DataStorage.BLOCK_SUBDIR_PREFIX, "")));
+      hasSubdirs = true;
       currentDir = currentDir.getParentFile();
     }
-    ret.subDirs = new int[subDirList.size()];
-    for (int i = 0; i < subDirList.size(); i++) {
-      ret.subDirs[i] = subDirList.get(i);
-    }
-    
-    ret.baseDirPath = currentDir.getAbsolutePath();
     
-    return ret;
+    return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
   }
 
   /**

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java Tue Aug 19 23:49:39 2014
@@ -50,7 +50,7 @@ public class ReplicaUnderRecovery extend
 
   /**
    * Copy constructor.
-   * @param from
+   * @param from where to copy from
    */
   public ReplicaUnderRecovery(ReplicaUnderRecovery from) {
     super(from);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java Tue Aug 19 23:49:39 2014
@@ -60,7 +60,7 @@ public class ReplicaWaitingToBeRecovered
   
   /**
    * Copy constructor.
-   * @param from
+   * @param from where to copy from
    */
   public ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) {
     super(from);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java Tue Aug 19 23:49:39 2014
@@ -74,7 +74,7 @@ import com.google.common.collect.HashMul
  * DN also marks the block's slots as "unanchorable" to prevent additional 
  * clients from initiating these operations in the future.
  * 
- * The counterpart fo this class on the client is {@link DfsClientShmManager}.
+ * The counterpart of this class on the client is {@link DfsClientShmManager}.
  */
 public class ShortCircuitRegistry {
   public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class);
@@ -217,7 +217,32 @@ public class ShortCircuitRegistry {
     }
     return allowMunlock;
   }
-  
+
+  /**
+   * Invalidate any slot associated with a blockId that we are invalidating
+   * (deleting) from this DataNode.  When a slot is invalid, the DFSClient will
+   * not use the corresponding replica for new read or mmap operations (although
+   * existing, ongoing read or mmap operations will complete.)
+   *
+   * @param blockId        The block ID.
+   */
+  public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
+    if (!enabled) return;
+    final Set<Slot> affectedSlots = slots.get(blockId);
+    if (!affectedSlots.isEmpty()) {
+      final StringBuilder bld = new StringBuilder();
+      String prefix = "";
+      bld.append("Block ").append(blockId).append(" has been invalidated.  ").
+          append("Marking short-circuit slots as invalid: ");
+      for (Slot slot : affectedSlots) {
+        slot.makeInvalid();
+        bld.append(prefix).append(slot.toString());
+        prefix = ", ";
+      }
+      LOG.info(bld.toString());
+    }
+  }
+
   public static class NewShmInfo implements Closeable {
     public final ShmId shmId;
     public final FileInputStream stream;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Tue Aug 19 23:49:39 2014
@@ -78,7 +78,8 @@ public class StorageLocation {
    * @return A StorageLocation object if successfully parsed, null otherwise.
    *         Does not throw any exceptions.
    */
-  static StorageLocation parse(String rawLocation) throws IOException {
+  public static StorageLocation parse(String rawLocation)
+      throws IOException, SecurityException {
     Matcher matcher = regex.matcher(rawLocation);
     StorageType storageType = StorageType.DEFAULT;
     String location = rawLocation;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java Tue Aug 19 23:49:39 2014
@@ -45,11 +45,19 @@ public class AvailableSpaceVolumeChoosin
   
   private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);
   
-  private static final Random RAND = new Random();
+  private final Random random;
   
   private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
   private float balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
 
+  AvailableSpaceVolumeChoosingPolicy(Random random) {
+    this.random = random;
+  }
+
+  public AvailableSpaceVolumeChoosingPolicy() {
+    this(new Random());
+  }
+
   @Override
   public synchronized void setConf(Configuration conf) {
     balancedSpaceThreshold = conf.getLong(
@@ -128,7 +136,7 @@ public class AvailableSpaceVolumeChoosin
           (highAvailableVolumes.size() * balancedPreferencePercent) /
           preferencePercentScaler;
       if (mostAvailableAmongLowVolumes < replicaSize ||
-          RAND.nextFloat() < scaledPreferencePercent) {
+          random.nextFloat() < scaledPreferencePercent) {
         volume = roundRobinPolicyHighAvailable.chooseVolume(
             highAvailableVolumes,
             replicaSize);
@@ -165,13 +173,8 @@ public class AvailableSpaceVolumeChoosin
     }
     
     /**
-     * Check if the available space on all the volumes is roughly equal.
-     * 
-     * @param volumes the volumes to check
-     * @return true if all volumes' free space is within the configured threshold,
-     *         false otherwise.
-     * @throws IOException
-     *           in the event of error checking amount of available space
+     * @return true if all volumes' free space is within the
+     *         configured threshold, false otherwise.
      */
     public boolean areAllVolumesWithinFreeSpaceThreshold() {
       long leastAvailable = Long.MAX_VALUE;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Tue Aug 19 23:49:39 2014
@@ -19,14 +19,17 @@ package org.apache.hadoop.hdfs.server.da
 
 
 import java.io.File;
+import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -37,6 +40,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -89,6 +93,10 @@ public interface FsDatasetSpi<V extends 
   /** @return a list of volumes. */
   public List<V> getVolumes();
 
+  /** Add an array of StorageLocation to FsDataset. */
+  public void addVolumes(Collection<StorageLocation> volumes)
+      throws IOException;
+
   /** @return a storage with the given storage ID */
   public DatanodeStorage getStorage(final String storageUuid);
 
@@ -124,16 +132,14 @@ public interface FsDatasetSpi<V extends 
 
   /**
    * Returns the specified block's on-disk length (excluding metadata)
-   * @param b
    * @return   the specified block's on-disk length (excluding metadta)
-   * @throws IOException
+   * @throws IOException on error
    */
   public long getLength(ExtendedBlock b) throws IOException;
 
   /**
    * Get reference to the replica meta info in the replicasMap. 
    * To be called from methods that are synchronized on {@link FSDataset}
-   * @param blockId
    * @return replica from the replicas map
    */
   @Deprecated
@@ -151,8 +157,8 @@ public interface FsDatasetSpi<V extends 
   
   /**
    * Returns an input stream at specified offset of the specified block
-   * @param b
-   * @param seekOffset
+   * @param b block
+   * @param seekOffset offset with in the block to seek to
    * @return an input stream to read the contents of the specified block,
    *  starting at the offset
    * @throws IOException
@@ -163,9 +169,6 @@ public interface FsDatasetSpi<V extends 
   /**
    * Returns an input stream at specified offset of the specified block
    * The block is still in the tmp directory and is not finalized
-   * @param b
-   * @param blkoff
-   * @param ckoff
    * @return an input stream to read the contents of the specified block,
    *  starting at the offset
    * @throws IOException
@@ -180,8 +183,8 @@ public interface FsDatasetSpi<V extends 
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
-      ) throws IOException;
+  public ReplicaInPipelineInterface createTemporary(StorageType storageType,
+      ExtendedBlock b) throws IOException;
 
   /**
    * Creates a RBW replica and returns the meta info of the replica
@@ -190,8 +193,8 @@ public interface FsDatasetSpi<V extends 
    * @return the meta info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public ReplicaInPipelineInterface createRbw(ExtendedBlock b
-      ) throws IOException;
+  public ReplicaInPipelineInterface createRbw(StorageType storageType,
+      ExtendedBlock b) throws IOException;
 
   /**
    * Recovers a RBW replica and returns the meta info of the replica
@@ -256,7 +259,6 @@ public interface FsDatasetSpi<V extends 
    * Finalizes the block previously opened for writing using writeToBlock.
    * The block size is what is in the parameter b and it must match the amount
    *  of data written
-   * @param b
    * @throws IOException
    */
   public void finalizeBlock(ExtendedBlock b) throws IOException;
@@ -264,7 +266,6 @@ public interface FsDatasetSpi<V extends 
   /**
    * Unfinalizes the block previously opened for writing using writeToBlock.
    * The temporary file associated with this block is deleted.
-   * @param b
    * @throws IOException
    */
   public void unfinalizeBlock(ExtendedBlock b) throws IOException;
@@ -289,14 +290,12 @@ public interface FsDatasetSpi<V extends 
 
   /**
    * Is the block valid?
-   * @param b
    * @return - true if the specified block is valid
    */
   public boolean isValidBlock(ExtendedBlock b);
 
   /**
    * Is the block a valid RBW?
-   * @param b
    * @return - true if the specified block is a valid RBW
    */
   public boolean isValidRbw(ExtendedBlock b);
@@ -327,7 +326,7 @@ public interface FsDatasetSpi<V extends 
    * Determine if the specified block is cached.
    * @param bpid Block pool id
    * @param blockIds - block id
-   * @returns true if the block is cached
+   * @return true if the block is cached
    */
   public boolean isCached(String bpid, long blockId);
 
@@ -440,5 +439,12 @@ public interface FsDatasetSpi<V extends 
    * @return true when trash is enabled
    */
   public boolean trashEnabled(String bpid);
+
+  /**
+   * submit a sync_file_range request to AsyncDiskService
+   */
+  public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
+      final FileDescriptor fd, final long offset, final long nbytes,
+      final int flags);
 }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Tue Aug 19 23:49:39 2014
@@ -59,7 +59,8 @@ class BlockPoolSlice {
   private final String bpid;
   private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
   private final File currentDir; // StorageDirectory/current/bpid/current
-  private final LDir finalizedDir; // directory store Finalized replica
+  // directory where finalized replicas are stored
+  private final File finalizedDir;
   private final File rbwDir; // directory store RBW replica
   private final File tmpDir; // directory store Temporary replica
   private static final String DU_CACHE_FILE = "dfsUsed";
@@ -74,7 +75,7 @@ class BlockPoolSlice {
    * @param bpid Block pool Id
    * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
    * @param bpDir directory corresponding to the BlockPool
-   * @param conf
+   * @param conf configuration
    * @throws IOException
    */
   BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
@@ -82,8 +83,13 @@ class BlockPoolSlice {
     this.bpid = bpid;
     this.volume = volume;
     this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
-    final File finalizedDir = new File(
+    this.finalizedDir = new File(
         currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+    if (!this.finalizedDir.exists()) {
+      if (!this.finalizedDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs " + this.finalizedDir);
+      }
+    }
 
     // Files that were being written when the datanode was last shutdown
     // are now moved back to the data directory. It is possible that
@@ -95,16 +101,6 @@ class BlockPoolSlice {
       FileUtil.fullyDelete(tmpDir);
     }
     this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
-    final boolean supportAppends = conf.getBoolean(
-        DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
-        DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
-    if (rbwDir.exists() && !supportAppends) {
-      FileUtil.fullyDelete(rbwDir);
-    }
-    final int maxBlocksPerDir = conf.getInt(
-        DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
-        DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
-    this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir);
     if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
       if (!rbwDir.isDirectory()) {
         throw new IOException("Mkdirs failed to create " + rbwDir.toString());
@@ -137,7 +133,7 @@ class BlockPoolSlice {
   }
 
   File getFinalizedDir() {
-    return finalizedDir.dir;
+    return finalizedDir;
   }
   
   File getRbwDir() {
@@ -245,26 +241,57 @@ class BlockPoolSlice {
   }
 
   File addBlock(Block b, File f) throws IOException {
-    File blockFile = finalizedDir.addBlock(b, f);
+    File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+    if (!blockDir.exists()) {
+      if (!blockDir.mkdirs()) {
+        throw new IOException("Failed to mkdirs " + blockDir);
+      }
+    }
+    File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
     File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
     dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
     return blockFile;
   }
     
   void checkDirs() throws DiskErrorException {
-    finalizedDir.checkDirTree();
+    DiskChecker.checkDirs(finalizedDir);
     DiskChecker.checkDir(tmpDir);
     DiskChecker.checkDir(rbwDir);
   }
     
   void getVolumeMap(ReplicaMap volumeMap) throws IOException {
     // add finalized replicas
-    finalizedDir.getVolumeMap(bpid, volumeMap, volume);
+    addToReplicasMap(volumeMap, finalizedDir, true);
     // add rbw replicas
     addToReplicasMap(volumeMap, rbwDir, false);
   }
 
   /**
+   * Recover an unlinked tmp file on datanode restart. If the original block
+   * does not exist, then the tmp file is renamed to be the
+   * original file name and the original name is returned; otherwise the tmp
+   * file is deleted and null is returned.
+   */
+  File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
+    File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
+    if (blockFile.exists()) {
+      // If the original block file still exists, then no recovery is needed.
+      if (!unlinkedTmp.delete()) {
+        throw new IOException("Unable to cleanup unlinked tmp file " +
+            unlinkedTmp);
+      }
+      return null;
+    } else {
+      if (!unlinkedTmp.renameTo(blockFile)) {
+        throw new IOException("Unable to rename unlinked tmp file " +
+            unlinkedTmp);
+      }
+      return blockFile;
+    }
+  }
+
+
+  /**
    * Add replicas under the given directory to the volume map
    * @param volumeMap the replicas map
    * @param dir an input directory
@@ -273,23 +300,34 @@ class BlockPoolSlice {
    */
   void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
       ) throws IOException {
-    File blockFiles[] = FileUtil.listFiles(dir);
-    for (File blockFile : blockFiles) {
-      if (!Block.isBlockFilename(blockFile))
+    File files[] = FileUtil.listFiles(dir);
+    for (File file : files) {
+      if (file.isDirectory()) {
+        addToReplicasMap(volumeMap, file, isFinalized);
+      }
+
+      if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
+        file = recoverTempUnlinkedBlock(file);
+        if (file == null) { // the original block still exists, so we cover it
+          // in another iteration and can continue here
+          continue;
+        }
+      }
+      if (!Block.isBlockFilename(file))
         continue;
       
       long genStamp = FsDatasetUtil.getGenerationStampFromFile(
-          blockFiles, blockFile);
-      long blockId = Block.filename2id(blockFile.getName());
+          files, file);
+      long blockId = Block.filename2id(file.getName());
       ReplicaInfo newReplica = null;
       if (isFinalized) {
         newReplica = new FinalizedReplica(blockId, 
-            blockFile.length(), genStamp, volume, blockFile.getParentFile());
+            file.length(), genStamp, volume, file.getParentFile());
       } else {
 
         boolean loadRwr = true;
-        File restartMeta = new File(blockFile.getParent()  +
-            File.pathSeparator + "." + blockFile.getName() + ".restart");
+        File restartMeta = new File(file.getParent()  +
+            File.pathSeparator + "." + file.getName() + ".restart");
         Scanner sc = null;
         try {
           sc = new Scanner(restartMeta);
@@ -297,8 +335,8 @@ class BlockPoolSlice {
           if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
             // It didn't expire. Load the replica as a RBW.
             newReplica = new ReplicaBeingWritten(blockId,
-                validateIntegrityAndSetLength(blockFile, genStamp), 
-                genStamp, volume, blockFile.getParentFile(), null);
+                validateIntegrityAndSetLength(file, genStamp),
+                genStamp, volume, file.getParentFile(), null);
             loadRwr = false;
           }
           sc.close();
@@ -307,7 +345,7 @@ class BlockPoolSlice {
               restartMeta.getPath());
           }
         } catch (FileNotFoundException fnfe) {
-          // nothing to do here
+          // nothing to do hereFile dir =
         } finally {
           if (sc != null) {
             sc.close();
@@ -316,15 +354,15 @@ class BlockPoolSlice {
         // Restart meta doesn't exist or expired.
         if (loadRwr) {
           newReplica = new ReplicaWaitingToBeRecovered(blockId,
-              validateIntegrityAndSetLength(blockFile, genStamp), 
-              genStamp, volume, blockFile.getParentFile());
+              validateIntegrityAndSetLength(file, genStamp),
+              genStamp, volume, file.getParentFile());
         }
       }
 
       ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
       if (oldReplica != null) {
         FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
-            "on disk: " + oldReplica.getBlockFile() + " and " + blockFile );
+            "on disk: " + oldReplica.getBlockFile() + " and " + file );
       }
     }
   }
@@ -411,10 +449,6 @@ class BlockPoolSlice {
     }
   }
     
-  void clearPath(File f) {
-    finalizedDir.clearPath(f);
-  }
-    
   @Override
   public String toString() {
     return currentDir.getAbsolutePath();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Tue Aug 19 23:49:39 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
+import java.io.FileDescriptor;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -31,6 +32,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
 
 /**
  * This class is a container of multiple thread pools, each for a volume,
@@ -42,6 +45,7 @@ import org.apache.hadoop.hdfs.server.pro
  * can be slow, and we don't want to use a single thread pool because that
  * is inefficient when we have more than 1 volume.  AsyncDiskService is the
  * solution for these.
+ * Another example of async disk operation is requesting sync_file_range().
  * 
  * This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
  * They should be combined.
@@ -57,6 +61,7 @@ class FsDatasetAsyncDiskService {
   private static final long THREADS_KEEP_ALIVE_SECONDS = 60; 
   
   private final DataNode datanode;
+  private final ThreadGroup threadGroup;
   private Map<File, ThreadPoolExecutor> executors
       = new HashMap<File, ThreadPoolExecutor>();
   
@@ -66,42 +71,52 @@ class FsDatasetAsyncDiskService {
    * 
    * The AsyncDiskServices uses one ThreadPool per volume to do the async
    * disk operations.
-   * 
-   * @param volumes The roots of the data volumes.
    */
-  FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) {
+  FsDatasetAsyncDiskService(DataNode datanode) {
     this.datanode = datanode;
+    this.threadGroup = new ThreadGroup(getClass().getSimpleName());
+  }
+
+  private void addExecutorForVolume(final File volume) {
+    ThreadFactory threadFactory = new ThreadFactory() {
+      int counter = 0;
+
+      @Override
+      public Thread newThread(Runnable r) {
+        int thisIndex;
+        synchronized (this) {
+          thisIndex = counter++;
+        }
+        Thread t = new Thread(threadGroup, r);
+        t.setName("Async disk worker #" + thisIndex +
+            " for volume " + volume);
+        return t;
+      }
+    };
 
-    final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName());
-    // Create one ThreadPool per volume
-    for (int v = 0 ; v < volumes.length; v++) {
-      final File vol = volumes[v];
-      ThreadFactory threadFactory = new ThreadFactory() {
-          int counter = 0;
-
-          @Override
-          public Thread newThread(Runnable r) {
-            int thisIndex;
-            synchronized (this) {
-              thisIndex = counter++;
-            }
-            Thread t = new Thread(threadGroup, r);
-            t.setName("Async disk worker #" + thisIndex +
-                      " for volume " + vol);
-            return t;
-          }
-        };
-
-      ThreadPoolExecutor executor = new ThreadPoolExecutor(
-          CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, 
-          THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, 
-          new LinkedBlockingQueue<Runnable>(), threadFactory);
-
-      // This can reduce the number of running threads
-      executor.allowCoreThreadTimeOut(true);
-      executors.put(vol, executor);
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(
+        CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
+        THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+    // This can reduce the number of running threads
+    executor.allowCoreThreadTimeOut(true);
+    executors.put(volume, executor);
+  }
+
+  /**
+   * Starts AsyncDiskService for a new volume
+   * @param volume the root of the new data volume.
+   */
+  synchronized void addVolume(File volume) {
+    if (executors == null) {
+      throw new RuntimeException("AsyncDiskService is already shutdown");
     }
-    
+    ThreadPoolExecutor executor = executors.get(volume);
+    if (executor != null) {
+      throw new RuntimeException("Volume " + volume + " is already existed.");
+    }
+    addExecutorForVolume(volume);
   }
   
   synchronized long countPendingDeletions() {
@@ -148,6 +163,21 @@ class FsDatasetAsyncDiskService {
     }
   }
 
+  public void submitSyncFileRangeRequest(FsVolumeImpl volume,
+      final FileDescriptor fd, final long offset, final long nbytes,
+      final int flags) {
+    execute(volume.getCurrentDir(), new Runnable() {
+      @Override
+      public void run() {
+        try {
+          NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags);
+        } catch (NativeIOException e) {
+          LOG.warn("sync_file_range error", e);
+        }
+      }
+    });
+  }
+
   /**
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.