You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC

svn commit: r1495297 [19/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jun 21 06:37:27 2013
@@ -27,8 +27,10 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.security.NoSuchAlgorithmException;
@@ -56,6 +58,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -76,11 +79,9 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeInstrumentation;
@@ -106,6 +107,7 @@ import org.apache.hadoop.hdfs.web.WebHdf
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -126,6 +128,7 @@ import org.apache.hadoop.util.DiskChecke
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
@@ -226,12 +229,26 @@ public class DataNode extends Configured
   int socketTimeout;
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
+  private boolean dropCacheBehindWrites = false;
+  private boolean syncBehindWrites = false;
+  private boolean dropCacheBehindReads = false;
+  private long readaheadLength = 0;
+
   int writePacketSize = 0;
-  private boolean supportAppends;
+  private boolean durableSync;
   boolean isBlockTokenEnabled;
   BlockTokenSecretManager blockTokenSecretManager;
   boolean isBlockTokenInitialized = false;
+  boolean syncOnClose;
+
   final String userWithLocalPathAccess;
+  private boolean connectToDnViaHostname;
+  private boolean relaxedVersionCheck;
+
+  /**
+   * Whether the DN completely skips version check with the NN.
+   */
+  private boolean noVersionCheck;
 
   /**
    * Testing hook that allows tests to delay the sending of blockReceived RPCs
@@ -242,6 +259,9 @@ public class DataNode extends Configured
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
   
+  /** Activated plug-ins. */
+  private List<ServicePlugin> plugins;
+  
   private static final Random R = new Random();
   
   public static final String DATA_DIR_KEY = "dfs.data.dir";
@@ -267,6 +287,8 @@ public class DataNode extends Configured
   public Server ipcServer;
 
   private SecureResources secureResources = null;
+
+  ReadaheadPool readaheadPool;
   
   /**
    * Current system time.
@@ -297,7 +319,7 @@ public class DataNode extends Configured
         DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
 
     datanodeObject = this;
-    supportAppends = conf.getBoolean("dfs.support.append", false);
+    durableSync = conf.getBoolean("dfs.durable.sync", true);
     this.userWithLocalPathAccess = conf
         .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
     try {
@@ -351,6 +373,13 @@ public class DataNode extends Configured
                                              true);
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
 
+    this.relaxedVersionCheck = conf.getBoolean(
+        CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY,
+        CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_DEFAULT);
+    noVersionCheck = conf.getBoolean(
+        CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY,
+        CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_DEFAULT);
+
     InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
     int tmpPort = socAddr.getPort();
     storage = new DataStorage();
@@ -416,13 +445,26 @@ public class DataNode extends Configured
     selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
                                      tmpPort);
     this.dnRegistration.setName(machineName + ":" + tmpPort);
-    LOG.info("Opened info server at " + tmpPort);
+    LOG.info("Opened data transfer server at " + tmpPort);
       
     this.threadGroup = new ThreadGroup("dataXceiverServer");
     this.dataXceiverServer = new Daemon(threadGroup, 
         new DataXceiverServer(ss, conf, this));
     this.threadGroup.setDaemon(true); // auto destroy when empty
 
+    this.readaheadLength = conf.getLong(
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+    this.dropCacheBehindWrites = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
+    this.syncBehindWrites = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+    this.dropCacheBehindReads = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
+
     this.blockReportInterval =
       conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
     this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay",
@@ -433,6 +475,11 @@ public class DataNode extends Configured
         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
     }
     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
+    
+    // do we need to sync block file contents to disk when blockfile is closed?
+    this.syncOnClose = conf.getBoolean(DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY, 
+                                       DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT);
+
     DataNode.nameNodeAddr = nameNodeAddr;
 
     //initialize periodic block scanner
@@ -446,9 +493,16 @@ public class DataNode extends Configured
       blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
     } else {
       LOG.info("Periodic Block Verification is disabled because " +
-               reason + ".");
+               reason);
     }
 
+    readaheadPool = ReadaheadPool.getInstance();
+
+    this.connectToDnViaHostname = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname);
+
     //create a servlet to serve full-file content
     InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
     String infoHost = infoSocAddr.getHostName();
@@ -508,6 +562,16 @@ public class DataNode extends Configured
     dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
 
     LOG.info("dnRegistration = " + dnRegistration);
+    
+    plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
+    for (ServicePlugin p: plugins) {
+      try {
+        p.start(this);
+        LOG.info("Started plug-in " + p);
+      } catch (Throwable t) {
+        LOG.warn("ServicePlugin " + p + " could not be started", t);
+      }
+    }
   }
   
   private ObjectName mxBean = null;
@@ -547,6 +611,43 @@ public class DataNode extends Configured
            SocketChannel.open().socket() : new Socket();                                   
   }
   
+  /**
+   * @return true if this datanode is permitted to connect to
+   *    the given namenode version
+   */
+  boolean isPermittedVersion(NamespaceInfo nsInfo) {
+    boolean versionMatch =
+      nsInfo.getVersion().equals(VersionInfo.getVersion());
+    boolean revisionMatch =
+      nsInfo.getRevision().equals(VersionInfo.getRevision());
+    if (revisionMatch && !versionMatch) {
+      throw new AssertionError("Invalid build. The revisions match" +
+          " but the NN version is " + nsInfo.getVersion() +
+          " and the DN version is " + VersionInfo.getVersion());
+    }
+    if (noVersionCheck) {
+      LOG.info("Permitting datanode version '" + VersionInfo.getVersion() +
+          "' and revision '" + VersionInfo.getRevision() +
+          "' to connect to namenode version '" + nsInfo.getVersion() +
+          "' and revision '" + nsInfo.getRevision() + "' because " +
+          CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY +
+          " is enabled");
+      return true;
+    } else {
+      if (relaxedVersionCheck) {
+        if (versionMatch && !revisionMatch) {
+          LOG.info("Permitting datanode revision " + VersionInfo.getRevision() +
+              " to connect to namenode revision " + nsInfo.getRevision() +
+              " because " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY +
+              " is enabled");
+        }
+        return versionMatch;
+      } else {
+        return revisionMatch;
+      }
+    }
+  }
+
   private NamespaceInfo handshake() throws IOException {
     NamespaceInfo nsInfo = new NamespaceInfo();
     while (shouldRun) {
@@ -560,13 +661,17 @@ public class DataNode extends Configured
         } catch (InterruptedException ie) {}
       }
     }
-    String errorMsg = null;
-    // verify build version
-    if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
-      errorMsg = "Incompatible build versions: namenode BV = " 
-        + nsInfo.getBuildVersion() + "; datanode BV = "
-        + Storage.getBuildVersion();
-      LOG.fatal( errorMsg );
+    if (!isPermittedVersion(nsInfo)) {
+      String errorMsg = "Shutting down. Incompatible version or revision." +
+          "DataNode version '" + VersionInfo.getVersion() +
+          "' and revision '" + VersionInfo.getRevision() +
+          "' and NameNode version '" + nsInfo.getVersion() +
+          "' and revision '" + nsInfo.getRevision() +
+          " and " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY +
+          " is " + (relaxedVersionCheck ? "enabled" : "not enabled") +
+          " and " + CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY +
+          " is " + (noVersionCheck ? "enabled" : "not enabled");
+      LOG.fatal(errorMsg);
       notifyNamenode(DatanodeProtocol.NOTIFY, errorMsg);  
       throw new IOException( errorMsg );
     }
@@ -584,9 +689,10 @@ public class DataNode extends Configured
   } 
 
   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
-      DatanodeID datanodeid, final Configuration conf, final int socketTimeout) throws IOException {
-    final InetSocketAddress addr = NetUtils.createSocketAddr(
-        datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+      DatanodeInfo info, final Configuration conf, final int socketTimeout,
+      boolean connectToDnViaHostname) throws IOException {
+    final String dnName = info.getNameWithIpcPort(connectToDnViaHostname);
+    final InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
       InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
     }
@@ -720,12 +826,13 @@ public class DataNode extends Configured
       dnRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
     }
 
-    if (supportAppends) {
+    if (durableSync) {
       Block[] bbwReport = data.getBlocksBeingWrittenReport();
-      long[] blocksBeingWritten = BlockListAsLongs
-          .convertToArrayLongs(bbwReport);
+      long[] blocksBeingWritten =
+        BlockListAsLongs.convertToArrayLongs(bbwReport);
       namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten);
     }
+
     // random short delay - helps scatter the BR from all DNs
     // - but we can start generating the block report immediately
     data.requestAsyncBlockReport();
@@ -739,6 +846,17 @@ public class DataNode extends Configured
    * Otherwise, deadlock might occur.
    */
   public void shutdown() {
+    if (plugins != null) {
+      for (ServicePlugin p : plugins) {
+        try {
+          p.stop();
+          LOG.info("Stopped plug-in " + p);
+        } catch (Throwable t) {
+          LOG.warn("ServicePlugin " + p + " could not be stopped", t);
+        }
+      }
+    }
+    
     this.unRegisterMXBean();
     if (infoServer != null) {
       try {
@@ -812,10 +930,19 @@ public class DataNode extends Configured
   /** Check if there is no space in disk 
    *  @param e that caused this checkDiskError call
    **/
-  protected void checkDiskError(Exception e ) throws IOException {
-    
-    LOG.warn("checkDiskError: exception: ", e);
-    
+  protected void checkDiskError(Exception e ) throws IOException {    
+    LOG.warn("checkDiskError: exception: ", e);  
+    if (e instanceof SocketException || e instanceof SocketTimeoutException
+    	  || e instanceof ClosedByInterruptException 
+    	  || e.getMessage().startsWith("An established connection was aborted")
+    	  || e.getMessage().startsWith("Broken pipe")
+    	  || e.getMessage().startsWith("Connection reset")
+    	  || e.getMessage().contains("java.nio.channels.SocketChannel")) {
+      LOG.info("Not checking disk as checkDiskError was called on a network" +
+        " related exception");	
+      return;
+    }
+
     if (e.getMessage() != null &&
         e.getMessage().startsWith("No space left on device")) {
       throw new DiskOutOfSpaceException("No space left on device");
@@ -872,7 +999,8 @@ public class DataNode extends Configured
   }
     
   /** Number of concurrent xceivers per node. */
-  int getXceiverCount() {
+  @Override // DataNodeMXBean
+  public int getXceiverCount() {
     return threadGroup == null ? 0 : threadGroup.activeCount();
   }
     
@@ -881,7 +1009,6 @@ public class DataNode extends Configured
    * forever calling remote NameNode functions.
    */
   public void offerService() throws Exception {
-     
     LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" + 
        " Initial delay: " + initialBlockReportDelay + "msec");
 
@@ -913,7 +1040,6 @@ public class DataNode extends Configured
                                                        xmitsInProgress.get(),
                                                        getXceiverCount());
           myMetrics.addHeartBeat(now() - startTime);
-          //LOG.info("Just sent heartbeat, with name " + localName);
           if (!processCommand(cmds))
             continue;
         }
@@ -1012,7 +1138,7 @@ public class DataNode extends Configured
         // start block scanner
         if (blockScanner != null && blockScannerThread == null &&
             upgradeManager.isUpgradeCompleted()) {
-          LOG.info("Starting Periodic block scanner.");
+          LOG.info("Starting Periodic block scanner");
           blockScannerThread = new Daemon(blockScanner);
           blockScannerThread.start();
         }
@@ -1117,7 +1243,7 @@ public class DataNode extends Configured
         }
         data.invalidate(toDelete);
       } catch(IOException e) {
-        checkDiskError();
+        // Exceptions caught here are not expected to be disk-related.
         throw e;
       }
       myMetrics.incrBlocksRemoved(toDelete.length);
@@ -1196,7 +1322,7 @@ public class DataNode extends Configured
                               ) throws IOException {
     if (!data.isValidBlock(block)) {
       // block does not exist or is under-construction
-      String errStr = "Can't send invalid block " + block;
+      String errStr = "Can't send invalid " + block;
       LOG.info(errStr);
       notifyNamenode(DatanodeProtocol.INVALID_BLOCK, errStr);
       return;
@@ -1209,7 +1335,7 @@ public class DataNode extends Configured
       namenode.reportBadBlocks(new LocatedBlock[]{
           new LocatedBlock(block, new DatanodeInfo[] {
               new DatanodeInfo(dnRegistration)})});
-      LOG.info("Can't replicate block " + block
+      LOG.info("Can't replicate " + block
           + " because on-disk length " + onDiskLength 
           + " is shorter than NameNode recorded length " + block.getNumBytes());
       return;
@@ -1223,7 +1349,7 @@ public class DataNode extends Configured
           xfersBuilder.append(xferTargets[i].getName());
           xfersBuilder.append(" ");
         }
-        LOG.info(dnRegistration + " Starting thread to transfer block " + 
+        LOG.info(dnRegistration + " Starting thread to transfer " + 
                  block + " to " + xfersBuilder);                       
       }
 
@@ -1238,7 +1364,7 @@ public class DataNode extends Configured
       try {
         transferBlock(blocks[i], xferTargets[i]);
       } catch (IOException ie) {
-        LOG.warn("Failed to transfer block " + blocks[i], ie);
+        LOG.warn("Failed to transfer " + blocks[i], ie);
       }
     }
   }
@@ -1383,9 +1509,10 @@ public class DataNode extends Configured
       BlockSender blockSender = null;
       
       try {
-        InetSocketAddress curTarget = 
-          NetUtils.createSocketAddr(targets[0].getName());
+        final String dnName = targets[0].getName(connectToDnViaHostname);
+        InetSocketAddress curTarget = NetUtils.createSocketAddr(dnName);
         sock = newSocket();
+        LOG.debug("Connecting to " + dnName);
         NetUtils.connect(sock, curTarget, socketTimeout);
         sock.setSoTimeout(targets.length * socketTimeout);
 
@@ -1426,14 +1553,17 @@ public class DataNode extends Configured
         blockSender.sendBlock(out, baseStream, null);
 
         // no response necessary
-        LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
+        LOG.info(dnRegistration + ":Transmitted " + b + " to " + curTarget);
 
       } catch (IOException ie) {
         LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
             + " got " + StringUtils.stringifyException(ie));
         // check if there are any disk problem
-        datanode.checkDiskError();
-        
+        try{
+          checkDiskError(ie);
+        } catch(IOException e) {
+          LOG.warn("DataNode.checkDiskError failed in run() with: ", e);
+        }        
       } finally {
         xmitsInProgress.getAndDecrement();
         IOUtils.closeStream(blockSender);
@@ -1512,7 +1642,7 @@ public class DataNode extends Configured
       conf = new Configuration();
     if (!parseArguments(args, conf)) {
       printUsage();
-      return null;
+      System.exit(-2);
     }
     if (conf.get("dfs.network.script") != null) {
       LOG.error("This configuration for rack identification is not supported" +
@@ -1747,9 +1877,9 @@ public class DataNode extends Configured
       data.finalizeBlockIfNeeded(newblock);
       myMetrics.incrBlocksWritten();
       notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
-      LOG.info("Received block " + newblock +
+      LOG.info("Received " + newblock +
                 " of size " + newblock.getNumBytes() +
-                " as part of lease recovery.");
+                " as part of lease recovery");
     }
   }
 
@@ -1875,13 +2005,12 @@ public class DataNode extends Configured
   private LocatedBlock recoverBlock(Block block, boolean keepLength,
       DatanodeInfo[] targets, boolean closeFile) throws IOException {
 
-    DatanodeID[] datanodeids = (DatanodeID[])targets;
     // If the block is already being recovered, then skip recovering it.
     // This can happen if the namenode and client start recovering the same
     // file at the same time.
     synchronized (ongoingRecovery) {
       if (ongoingRecovery.get(block.getWithWildcardGS()) != null) {
-        String msg = "Block " + block + " is already being recovered, " +
+        String msg = block + " is already being recovered, " +
                      " ignoring this request to recover it.";
         LOG.info(msg);
         throw new IOException(msg);
@@ -1901,13 +2030,14 @@ public class DataNode extends Configured
       int rwrCount = 0;
       
       List<BlockRecord> blockRecords = new ArrayList<BlockRecord>();
-      for(DatanodeID id : datanodeids) {
+      for (DatanodeInfo id : targets) {
         try {
-          InterDatanodeProtocol datanode = dnRegistration.equals(id)?
-              this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout);
+          InterDatanodeProtocol datanode = dnRegistration.equals(id) ? this 
+            : DataNode.createInterDataNodeProtocolProxy(
+                id, getConf(), socketTimeout, connectToDnViaHostname);
           BlockRecoveryInfo info = datanode.startBlockRecovery(block);
           if (info == null) {
-            LOG.info("No block metadata found for block " + block + " on datanode "
+            LOG.info("No block metadata found for " + block + " on datanode "
                 + id);
             continue;
           }
@@ -1962,7 +2092,7 @@ public class DataNode extends Configured
 
       if (syncList.isEmpty() && errorCount > 0) {
         throw new IOException("All datanodes failed: block=" + block
-            + ", datanodeids=" + Arrays.asList(datanodeids));
+            + ", datanodeids=" + Arrays.asList(targets));
       }
       if (!keepLength) {
         block.setNumBytes(minlength);
@@ -2133,4 +2263,20 @@ public class DataNode extends Configured
                        (DataXceiverServer) this.dataXceiverServer.getRunnable();
     return dxcs.balanceThrottler.getBandwidth();
   }
+
+  long getReadaheadLength() {
+    return readaheadLength;
+  }
+
+  boolean shouldDropCacheBehindWrites() {
+    return dropCacheBehindWrites;
+  }
+
+  boolean shouldDropCacheBehindReads() {
+    return dropCacheBehindReads;
+  }
+
+  boolean shouldSyncBehindWrites() {
+    return syncBehindWrites;
+  }
 }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java Fri Jun 21 06:37:27 2013
@@ -63,4 +63,10 @@ public interface DataNodeMXBean {
    * @return the volume info
    */
   public String getVolumeInfo();
+
+  /**
+  * Returns an estimate of the number of Datanode threads
+  * actively transferring blocks.
+  */
+  public int getXceiverCount();
 }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Fri Jun 21 06:37:27 2013
@@ -115,11 +115,11 @@ public class DataStorage extends Storage
           break;
         case NON_EXISTENT:
           // ignore this storage
-          LOG.info("Storage directory " + dataDir + " does not exist.");
+          LOG.info("Storage directory " + dataDir + " does not exist");
           it.remove();
           continue;
         case NOT_FORMATTED: // format
-          LOG.info("Storage directory " + dataDir + " is not formatted.");
+          LOG.info("Storage directory " + dataDir + " is not formatted");
           LOG.info("Formatting ...");
           format(sd, nsInfo);
           break;
@@ -291,7 +291,7 @@ public class DataStorage extends Storage
     // rename tmp to previous
     rename(tmpDir, prevDir);
     LOG.info( hardLink.linkStats.report());
-    LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
+    LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
   }
 
   void doRollback( StorageDirectory sd,
@@ -327,7 +327,7 @@ public class DataStorage extends Storage
     rename(prevDir, curDir);
     // delete tmp dir
     deleteDir(tmpDir);
-    LOG.info("Rollback of " + sd.getRoot() + " is complete.");
+    LOG.info("Rollback of " + sd.getRoot() + " is complete");
   }
 
   void doFinalize(StorageDirectory sd) throws IOException {
@@ -350,9 +350,9 @@ public class DataStorage extends Storage
           try {
             deleteDir(tmpDir);
           } catch(IOException ex) {
-            LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
+            LOG.error("Finalize upgrade for " + dataDirPath + " failed", ex);
           }
-          LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
+          LOG.info("Finalize upgrade for " + dataDirPath + " is complete");
         }
         public String toString() { return "Finalize " + dataDirPath; }
       }).start();

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jun 21 06:37:27 2013
@@ -28,6 +28,7 @@ import java.net.Socket;
 import java.net.SocketException;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -58,6 +59,7 @@ class DataXceiver implements Runnable, F
   final String localAddress;  // local address of this daemon
   DataNode datanode;
   DataXceiverServer dataXceiverServer;
+  private boolean connectToDnViaHostname;
   
   public DataXceiver(Socket s, DataNode datanode, 
       DataXceiverServer dataXceiverServer) {
@@ -69,6 +71,9 @@ class DataXceiver implements Runnable, F
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
     LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+    this.connectToDnViaHostname = datanode.getConf().getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
   }
 
   /**
@@ -169,7 +174,7 @@ class DataXceiver implements Runnable, F
           out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
           out.flush();
           throw new IOException("Access token verification failed, for client "
-              + remoteAddress + " for OP_READ_BLOCK for block " + block);
+              + remoteAddress + " for OP_READ_BLOCK for " + block);
         } finally {
           IOUtils.closeStream(out);
         }
@@ -182,7 +187,7 @@ class DataXceiver implements Runnable, F
         ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
             "%d", "HDFS_READ", clientName, "%d", 
             datanode.dnRegistration.getStorageID(), block, "%d")
-        : datanode.dnRegistration + " Served block " + block + " to " +
+        : datanode.dnRegistration + " Served " + block + " to " +
             s.getInetAddress();
     try {
       try {
@@ -217,9 +222,8 @@ class DataXceiver implements Runnable, F
        * Earlier version shutdown() datanode if there is disk error.
        */
       LOG.warn(datanode.dnRegistration +  ":Got exception while serving " + 
-          block + " to " +
-                s.getInetAddress() + ":\n" + 
-                StringUtils.stringifyException(ioe) );
+          block + " to " + s.getInetAddress() + ":\n" + 
+          StringUtils.stringifyException(ioe) );
       throw ioe;
     } finally {
       IOUtils.closeStream(out);
@@ -242,9 +246,8 @@ class DataXceiver implements Runnable, F
     //
     Block block = new Block(in.readLong(), 
         dataXceiverServer.estimateBlockSize, in.readLong());
-    LOG.info("Receiving block " + block + 
-             " src: " + remoteAddress +
-             " dest: " + localAddress);
+    LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
+        + localAddress);
     int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
     boolean isRecovery = in.readBoolean(); // is this part of recovery?
     String client = Text.readString(in); // working on behalf of this client
@@ -280,7 +283,7 @@ class DataXceiver implements Runnable, F
             replyOut.flush();
           }
           throw new IOException("Access token verification failed, for client "
-              + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
+              + remoteAddress + " for OP_WRITE_BLOCK for " + block);
         } finally {
           IOUtils.closeStream(replyOut);
         }
@@ -308,14 +311,17 @@ class DataXceiver implements Runnable, F
       if (targets.length > 0) {
         InetSocketAddress mirrorTarget = null;
         // Connect to backup machine
+        final String mirrorAddrString = 
+          targets[0].getName(connectToDnViaHostname);
         mirrorNode = targets[0].getName();
-        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
+        mirrorTarget = NetUtils.createSocketAddr(mirrorAddrString);
         mirrorSock = datanode.newSocket();
         try {
           int timeoutValue = datanode.socketTimeout +
                              (HdfsConstants.READ_TIMEOUT_EXTENSION * numTargets);
           int writeTimeout = datanode.socketWriteTimeout + 
                              (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
+          LOG.debug("Connecting to " + mirrorAddrString);
           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
           mirrorSock.setSoTimeout(timeoutValue);
           mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
@@ -373,9 +379,9 @@ class DataXceiver implements Runnable, F
           if (client.length() > 0) {
             throw e;
           } else {
-            LOG.info(datanode.dnRegistration + ":Exception transfering block " +
+            LOG.info(datanode.dnRegistration + ":Exception transfering " +
                      block + " to mirror " + mirrorNode +
-                     ". continuing without the mirror.\n" +
+                     "- continuing without the mirror\n" +
                      StringUtils.stringifyException(e));
           }
         }
@@ -403,10 +409,8 @@ class DataXceiver implements Runnable, F
       // the block is finalized in the PacketResponder.
       if (client.length() == 0) {
         datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
-        LOG.info("Received block " + block + 
-                 " src: " + remoteAddress +
-                 " dest: " + localAddress +
-                 " of size " + block.getNumBytes());
+        LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
+            + localAddress + " size " + block.getNumBytes());
       }
 
       if (datanode.blockScanner != null) {
@@ -446,7 +450,7 @@ class DataXceiver implements Runnable, F
           out.flush();
           throw new IOException(
               "Access token verification failed, for client " + remoteAddress
-                  + " for OP_BLOCK_CHECKSUM for block " + block);
+                  + " for OP_BLOCK_CHECKSUM for " + block);
         } finally {
           IOUtils.closeStream(out);
         }
@@ -504,7 +508,7 @@ class DataXceiver implements Runnable, F
             BlockTokenSecretManager.AccessMode.COPY);
       } catch (InvalidToken e) {
         LOG.warn("Invalid access token in request from "
-            + remoteAddress + " for OP_COPY_BLOCK for block " + block);
+            + remoteAddress + " for OP_COPY_BLOCK for " + block);
         sendResponse(s,
             (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
             datanode.socketWriteTimeout);
@@ -514,7 +518,7 @@ class DataXceiver implements Runnable, F
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       LOG.info("Not able to copy block " + blockId + " to " 
-          + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+          + s.getRemoteSocketAddress() + " because threads quota is exceeded");
       sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR, 
           datanode.socketWriteTimeout);
       return;
@@ -544,7 +548,7 @@ class DataXceiver implements Runnable, F
       datanode.myMetrics.incrBytesRead((int) read);
       datanode.myMetrics.incrBlocksRead();
       
-      LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
+      LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress());
     } catch (IOException ioe) {
       isOpSuccess = false;
       throw ioe;
@@ -608,9 +612,11 @@ class DataXceiver implements Runnable, F
     
     try {
       // get the output stream to the proxy
-      InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
-          proxySource.getName());
+      final String proxyAddrString = 
+        proxySource.getName(connectToDnViaHostname);
+      InetSocketAddress proxyAddr = NetUtils.createSocketAddr(proxyAddrString);
       proxySock = datanode.newSocket();
+      LOG.debug("Connecting to " + proxyAddrString);
       NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout);
       proxySock.setSoTimeout(datanode.socketTimeout);
 
@@ -633,11 +639,11 @@ class DataXceiver implements Runnable, F
       short status = proxyReply.readShort();
       if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
         if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
-          throw new IOException("Copy block " + block + " from "
+          throw new IOException("Copy " + block + " from "
               + proxySock.getRemoteSocketAddress()
               + " failed due to access token error");
         }
-        throw new IOException("Copy block " + block + " from "
+        throw new IOException("Copy " + block + " from "
             + proxySock.getRemoteSocketAddress() + " failed");
       }
       // open a block receiver and check if the block does not exist
@@ -653,7 +659,7 @@ class DataXceiver implements Runnable, F
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, sourceID);
 
-      LOG.info("Moved block " + block + 
+      LOG.info("Moved " + block + 
           " from " + s.getRemoteSocketAddress());
       
     } catch (IOException ioe) {

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Fri Jun 21 06:37:27 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -63,7 +64,7 @@ class DataXceiverServer implements Runna
    * It limits the number of block moves for balancing and
    * the total amount of bandwidth they can use.
    */
-  static class BlockBalanceThrottler extends BlockTransferThrottler {
+  static class BlockBalanceThrottler extends DataTransferThrottler {
    private int numThreads;
    
    /**Constructor

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java Fri Jun 21 06:37:27 2013
@@ -109,15 +109,15 @@ class DatanodeBlockInfo {
       return false;
     }
     if (file == null || volume == null) {
-      throw new IOException("detachBlock:Block not found. " + block);
+      throw new IOException("detachBlock: not found " + block);
     }
     File meta = FSDataset.getMetaFile(file, block);
     if (meta == null) {
-      throw new IOException("Meta file not found for block " + block);
+      throw new IOException("Meta file not found for " + block);
     }
 
     if (HardLink.getLinkCount(file) > numLinks) {
-      DataNode.LOG.info("CopyOnWrite for block " + block);
+      DataNode.LOG.info("CopyOnWrite for " + block);
       detachFile(file, block);
     }
     if (HardLink.getLinkCount(meta) > numLinks) {

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Jun 21 06:37:27 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -356,7 +357,7 @@ public class FSDataset implements FSCons
       this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
       this.dataDir = new FSDir(currentDir);
       this.currentDir = currentDir;
-      boolean supportAppends = conf.getBoolean("dfs.support.append", false);
+      boolean durableSync = conf.getBoolean("dfs.durable.sync", true);
       File parent = currentDir.getParentFile();
 
       this.detachDir = new File(parent, "detach");
@@ -376,7 +377,7 @@ public class FSDataset implements FSCons
       // should not be deleted.
       blocksBeingWritten = new File(parent, "blocksBeingWritten");
       if (blocksBeingWritten.exists()) {
-        if (supportAppends) {  
+        if (durableSync) {  
           recoverBlocksBeingWritten(blocksBeingWritten);
         } else {
           FileUtil.fullyDelete(blocksBeingWritten);
@@ -573,7 +574,7 @@ public class FSDataset implements FSCons
             // add this block to block set
             blockSet.add(block);
             if (DataNode.LOG.isDebugEnabled()) {
-              DataNode.LOG.debug("recoverBlocksBeingWritten for block " + block);
+              DataNode.LOG.debug("recoverBlocksBeingWritten for " + block);
             }
           }
         }
@@ -774,7 +775,7 @@ public class FSDataset implements FSCons
         volumes = fsvs; // replace array of volumes
       }
       Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size + 
-          "volumes. List of current volumes: " +   toString());
+          " volumes. Current volumes: " +   toString());
       
       return removed_vols;
     }
@@ -1077,7 +1078,7 @@ public class FSDataset implements FSCons
   /**
    * Get File name for a given block.
    */
-  public synchronized File getBlockFile(Block b) throws IOException {
+  public File getBlockFile(Block b) throws IOException {
     File f = validateBlockFile(b);
     if(f == null) {
       if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
@@ -1098,27 +1099,60 @@ public class FSDataset implements FSCons
     return info;
   }
   
-  public synchronized InputStream getBlockInputStream(Block b) throws IOException {
-    if (isNativeIOAvailable) {
-      return NativeIO.getShareDeleteFileInputStream(getBlockFile(b));
-    } else {
-      return new FileInputStream(getBlockFile(b));
+  public InputStream getBlockInputStream(Block b) throws IOException {
+    File f = getBlockFileNoExistsCheck(b);
+    try {
+      if (isNativeIOAvailable) {
+        return NativeIO.getShareDeleteFileInputStream(f);
+      } else {
+        return new FileInputStream(f);
+      }
+    } catch (FileNotFoundException fnfe) {
+      throw new IOException("Block " + b + " is not valid. "
+          + "Expected block file at " + f + " does not exist.");
     }
   }
 
-  public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
-    File blockFile = getBlockFile(b);
-    if (isNativeIOAvailable) {
-      return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
-    } else {
-      RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
-      if (seekOffset > 0) {
-        blockInFile.seek(seekOffset);
+  /**
+   * Return the File associated with a block, without first checking that it
+   * exists. This should be used when the next operation is going to open the
+   * file for read anyway, and thus the exists check is redundant.
+   */
+  private File getBlockFileNoExistsCheck(Block b) throws IOException {
+    File f = getFile(b);
+    if (f == null) {
+      throw new IOException("Block " + b + " is not valid");
+    }
+    return f;
+  }
+
+  public InputStream getBlockInputStream(Block b, long seekOffset)
+      throws IOException {
+    File blockFile = getBlockFileNoExistsCheck(b);
+    try {
+      if (isNativeIOAvailable) {
+        return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
+      } else {
+        RandomAccessFile blockInFile;
+        try {
+          blockInFile = new RandomAccessFile(blockFile, "r");
+        } catch (FileNotFoundException fnfe) {
+          throw new IOException("Block " + b + " is not valid. "
+              + "Expected block file at " + blockFile + " does not exist.");
+        }
+
+        if (seekOffset > 0) {
+          blockInFile.seek(seekOffset);
+        }
+        return new FileInputStream(blockInFile.getFD());
       }
-      return new FileInputStream(blockInFile.getFD());
+    } catch (FileNotFoundException fnfe) {
+      throw new IOException("Block " + b + " is not valid. "
+          + "Expected block file at " + blockFile + " does not exist.");
     }
   }
 
+
   /**
    * Returns handles to the block file and its metadata file
    */
@@ -1467,7 +1501,7 @@ public class FSDataset implements FSCons
         volumeMap.put(b, new DatanodeBlockInfo(v, f));
       } else {
         // reopening block for appending to it.
-        DataNode.LOG.info("Reopen Block for append " + b);
+        DataNode.LOG.info("Reopen for append " + b);
         v = volumeMap.get(b).getVolume();
         f = createTmpFile(v, b, replicationRequest);
         File blkfile = getBlockFile(b);
@@ -1486,19 +1520,18 @@ public class FSDataset implements FSCons
         DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
         if (!blkfile.renameTo(f)) {
           if (!f.delete()) {
-            throw new IOException("Block " + b + " reopen failed. " +
+            throw new IOException(b + " reopen failed. " +
                                   " Unable to remove file " + f);
           }
           if (!blkfile.renameTo(f)) {
-            throw new IOException("Block " + b + " reopen failed. " +
+            throw new IOException(b + " reopen failed. " +
                                   " Unable to move block file " + blkfile +
                                   " to tmp dir " + f);
           }
         }
       }
       if (f == null) {
-        DataNode.LOG.warn("Block " + b + " reopen failed " +
-                          " Unable to locate tmp file.");
+        DataNode.LOG.warn(b + " reopen failed. Unable to locate tmp file");
         throw new IOException("Block " + b + " reopen failed " +
                               " Unable to locate tmp file.");
       }
@@ -1739,9 +1772,10 @@ public class FSDataset implements FSCons
       long st = System.currentTimeMillis();
       // broken out to a static method to simplify testing
       reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
-      DataNode.LOG.info(
-          "Reconciled asynchronous block report against current state in " +
-          (System.currentTimeMillis() - st) + " ms");
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("Reconciled block report with current state in "
+                + (System.currentTimeMillis() - st) + "ms");
+      }
       
       blockReport = seenOnDisk.keySet();
     }
@@ -2245,7 +2279,6 @@ public class FSDataset implements FSCons
           waitForReportRequest();
           assert requested && scan == null;
           
-          DataNode.LOG.info("Starting asynchronous block report scan");
           long st = System.currentTimeMillis();
           HashMap<Block, File> result = fsd.roughBlockScan();
           DataNode.LOG.info("Finished asynchronous block report scan in "

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Fri Jun 21 06:37:27 2013
@@ -130,7 +130,7 @@ class FSDatasetAsyncDiskService {
       // clear the executor map so that calling execute again will fail.
       executors = null;
       
-      LOG.info("All async disk service threads have been shut down.");
+      LOG.info("All async disk service threads have been shut down");
     }
   }
 
@@ -140,7 +140,7 @@ class FSDatasetAsyncDiskService {
    */
   void deleteAsync(FSDataset.FSVolume volume, File blockFile,
       File metaFile, long dfsBytes, String blockName) {
-    DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
+    DataNode.LOG.info("Scheduling " + blockName + " file " + blockFile
         + " for deletion");
     ReplicaFileDeleteTask deletionTask = 
         new ReplicaFileDeleteTask(volume, blockFile, metaFile, dfsBytes,
@@ -175,18 +175,18 @@ class FSDatasetAsyncDiskService {
     @Override
     public String toString() {
       // Called in AsyncDiskService.execute for displaying error messages.
-      return "deletion of block " + blockName + " with block file " + blockFile
+      return "deletion of " + blockName + " with file " + blockFile
           + " and meta file " + metaFile + " from volume " + volume;
     }
 
     @Override
     public void run() {
       if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
-        DataNode.LOG.warn("Unexpected error trying to delete block "
+        DataNode.LOG.warn("Unexpected error trying to delete "
             + blockName + " at file " + blockFile + ". Ignored.");
       } else {
         volume.decDfsUsed(dfsBytes);
-        DataNode.LOG.info("Deleted block " + blockName + " at file " + blockFile);
+        DataNode.LOG.info("Deleted " + blockName + " at file " + blockFile);
       }
     }
   };

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java Fri Jun 21 06:37:27 2013
@@ -54,7 +54,7 @@ class UpgradeManagerDatanode extends Upg
     DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
         + dataNode.dnRegistration.getName() 
         + " version " + getUpgradeVersion() + " to current LV " 
-        + FSConstants.LAYOUT_VERSION + " is initialized.");
+        + FSConstants.LAYOUT_VERSION + " is initialized");
     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
     curUO.setDatanode(dataNode);
     upgradeState = curUO.preUpgradeAction(nsInfo);
@@ -99,7 +99,7 @@ class UpgradeManagerDatanode extends Upg
       DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
           + getUpgradeVersion() + " to current LV " 
           + FSConstants.LAYOUT_VERSION + " cannot be started. "
-          + "The upgrade object is not defined.");
+          + "The upgrade object is not defined");
       return false;
     }
     upgradeState = true;
@@ -111,7 +111,7 @@ class UpgradeManagerDatanode extends Upg
     DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
         + dataNode.dnRegistration.getName() 
         + " version " + getUpgradeVersion() + " to current LV " 
-        + FSConstants.LAYOUT_VERSION + " is started.");
+        + FSConstants.LAYOUT_VERSION + " is started");
     return true;
   }
 
@@ -141,7 +141,7 @@ class UpgradeManagerDatanode extends Upg
     DataNode.LOG.info("\n   Distributed upgrade for DataNode " 
         + dataNode.dnRegistration.getName() 
         + " version " + getUpgradeVersion() + " to current LV " 
-        + FSConstants.LAYOUT_VERSION + " is complete.");
+        + FSConstants.LAYOUT_VERSION + " is complete");
   }
 
   synchronized void shutdownUpgrade() {

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java Fri Jun 21 06:37:27 2013
@@ -109,7 +109,7 @@ public abstract class UpgradeObjectDatan
     if(getUpgradeStatus() < 100) {
       DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
           + getVersion() + " to current LV " 
-          + FSConstants.LAYOUT_VERSION + " cannot be completed.");
+          + FSConstants.LAYOUT_VERSION + " cannot be completed");
     }
 
     // Complete the upgrade by calling the manager method

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Fri Jun 21 06:37:27 2013
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -39,7 +38,6 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -341,31 +339,10 @@ public class DatanodeWebHdfsMethods {
         IOUtils.cleanup(LOG, dfsclient);
         throw ioe;
       }
-      final DFSDataInputStream dis = in;
-      final StreamingOutput streaming = new StreamingOutput() {
-        @Override
-        public void write(final OutputStream out) throws IOException {
-          final Long n = length.getValue();
-          DFSDataInputStream dfsin = dis;
-          DFSClient client = dfsclient;
-          try {
-            if (n == null) {
-              IOUtils.copyBytes(dfsin, out, b);
-            } else {
-              IOUtils.copyBytes(dfsin, out, n, b, false);
-            }
-            dfsin.close();
-            dfsin = null;
-            client.close();
-            client = null;
-          } finally {
-            IOUtils.cleanup(LOG, dfsin);
-            IOUtils.cleanup(LOG, client);
-          }
-        }
-      };
-
-      return Response.ok(streaming).type(
+      
+      final long n = length.getValue() != null? length.getValue()
+          : in.getVisibleLength();
+      return Response.ok(new OpenEntity(in, n, dfsclient)).type(
           MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETFILECHECKSUM:

Added: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java (added)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.web.resources;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * A response entity for a DFSDataInputStream.
+ */
+public class OpenEntity {
+  private final DFSDataInputStream in;
+  private final long length;
+  private final DFSClient dfsclient;
+  
+  OpenEntity(final DFSDataInputStream in, final long length,
+      final DFSClient dfsclient) {
+    this.in = in;
+    this.length = length;
+    this.dfsclient = dfsclient;
+  }
+  
+  /**
+   * A {@link MessageBodyWriter} for {@link OpenEntity}.
+   */
+  @Provider
+  public static class Writer implements MessageBodyWriter<OpenEntity> {
+
+    @Override
+    public boolean isWriteable(Class<?> clazz, Type genericType,
+        Annotation[] annotations, MediaType mediaType) {
+      return clazz == OpenEntity.class
+          && MediaType.APPLICATION_OCTET_STREAM_TYPE.isCompatible(mediaType);
+    }
+
+    @Override
+    public long getSize(OpenEntity e, Class<?> type, Type genericType,
+        Annotation[] annotations, MediaType mediaType) {
+      return e.length;
+    }
+
+    @Override
+    public void writeTo(OpenEntity e, Class<?> type, Type genericType,
+        Annotation[] annotations, MediaType mediaType,
+        MultivaluedMap<String, Object> httpHeaders, OutputStream out
+        ) throws IOException {
+      try {
+        IOUtils.copyBytes(e.in, out, e.length, 4096, false);
+      } finally {
+        IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.in);
+        IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.dfsclient);
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Fri Jun 21 06:37:27 2013
@@ -58,6 +58,10 @@ class BlocksMap {
     INodeFile getINode() {
       return inode;
     }
+    
+    void setINode(INodeFile inode) {
+      this.inode = inode;
+    }
 
     DatanodeDescriptor getDatanode(int index) {
       assert this.triplets != null : "BlockInfo is not initialized";
@@ -311,39 +315,13 @@ class BlocksMap {
   private GSet<Block, BlockInfo> blocks;
 
   BlocksMap(int initialCapacity, float loadFactor) {
-    this.capacity = computeCapacity();
+    // Use 2% of total memory to size the GSet capacity
+    this.capacity = LightWeightGSet.computeCapacity(2.0, "BlocksMap");
     this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity);
   }
 
-  /**
-   * Let t = 2% of max memory.
-   * Let e = round(log_2 t).
-   * Then, we choose capacity = 2^e/(size of reference),
-   * unless it is outside the close interval [1, 2^30].
-   */
-  private static int computeCapacity() {
-    //VM detection
-    //See http://java.sun.com/docs/hotspot/HotSpotFAQ.html#64bit_detection
-    final String vmBit = System.getProperty("sun.arch.data.model");
-
-    //2% of max memory
-    final double twoPC = Runtime.getRuntime().maxMemory()/50.0;
-
-    //compute capacity
-    final int e1 = (int)(Math.log(twoPC)/Math.log(2.0) + 0.5);
-    final int e2 = e1 - ("32".equals(vmBit)? 2: 3);
-    final int exponent = e2 < 0? 0: e2 > 30? 30: e2;
-    final int c = 1 << exponent;
-
-    LightWeightGSet.LOG.info("VM type       = " + vmBit + "-bit");
-    LightWeightGSet.LOG.info("2% max memory = " + twoPC/(1 << 20) + " MB");
-    LightWeightGSet.LOG.info("capacity      = 2^" + exponent
-        + " = " + c + " entries");
-    return c;
-  }
-
   void close() {
-    blocks = null;
+    blocks.clear();
   }
 
   /**

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Jun 21 06:37:27 2013
@@ -29,7 +29,15 @@ public interface FSClusterStats {
    * @return a count of the total number of block transfers and block
    *         writes that are currently occuring on the cluster.
    */
+  public int getTotalLoad();
 
-  public int getTotalLoad() ;
+  /**
+   * Indicate whether or not the cluster is now avoiding to use stale DataNodes
+   * for writing.
+   * 
+   * @return True if the cluster is currently avoiding using stale DataNodes for
+   *         writing targets, and false otherwise.
+   */
+  public boolean shouldAvoidStaleDataNodesForWrite();
 }
-    
\ No newline at end of file
+    

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Jun 21 06:37:27 2013
@@ -176,16 +176,13 @@ class FSDirectory implements FSConstants
       newNode = addNode(path, newNode, -1, false);
     }
     if (newNode == null) {
-      NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
-                                   +"failed to add "+path
-                                   +" to the file system");
+      NameNode.stateChangeLog.info("DIR* addFile: " + "failed to add " + path);
       return null;
     }
     // add create file record to log, record new generation stamp
     fsImage.getEditLog().logOpenFile(path, newNode);
 
-    NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
-                                  +path+" is added to the file system");
+    NameNode.stateChangeLog.debug("DIR* addFile: " + path + " is added");
     return newNode;
   }
 
@@ -291,7 +288,7 @@ class FSDirectory implements FSConstants
 
       NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
                                     + path + " with " + block
-                                    + " block is added to the in-memory "
+                                    + " is added to the in-memory "
                                     + "file system");
     }
     return block;
@@ -308,7 +305,7 @@ class FSDirectory implements FSConstants
       fsImage.getEditLog().logOpenFile(path, file);
       NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
                                     +path+" with "+ file.getBlocks().length 
-                                    +" blocks is persisted to the file system");
+                                    +" blocks is persisted");
     }
   }
 
@@ -323,7 +320,7 @@ class FSDirectory implements FSConstants
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
                                     +path+" with "+ file.getBlocks().length 
-                                    +" blocks is persisted to the file system");
+                                    +" blocks is persisted");
       }
     }
   }
@@ -345,8 +342,7 @@ class FSDirectory implements FSConstants
       // write modified block locations to log
       fsImage.getEditLog().logOpenFile(path, fileNode);
       NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
-                                    +path+" with "+block
-                                    +" block is added to the file system");
+          + path + " with "+ block +" is added to the");
       // update space consumed
       INode[] pathINodes = getExistingPathINodes(path);
       updateCount(pathINodes, pathINodes.length-1, 0,
@@ -589,21 +585,90 @@ class FSDirectory implements FSConstants
       }
     }
   }
+
+  /**
+   * Concat - see {@link #unprotectedConcat(String, String[], long)}
+   */
+  public void concat(String target, String [] srcs) throws IOException{
+    synchronized(rootDir) {
+      // actual move
+      waitForReady();
+      long timestamp = FSNamesystem.now();
+      unprotectedConcat(target, srcs, timestamp);
+      // do the commit
+      fsImage.getEditLog().logConcat(target, srcs, timestamp);
+    }
+  }
+  
+
+  
+  /**
+   * Moves all the blocks from {@code srcs} in the order of {@code srcs} array
+   * and appends them to {@code target}.
+   * The {@code srcs} files are deleted.
+   * @param target file to move the blocks to
+   * @param srcs list of file to move the blocks from
+   * Must be public because also called from EditLogs
+   * NOTE: - it does not update quota since concat is restricted to same dir.
+   */
+  public void unprotectedConcat(String target, String [] srcs, long timestamp) {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+    }
+    // do the move
     
+    final INode[] trgINodes =  getExistingPathINodes(target);
+    INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
+    INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
+    
+    INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+    int i = 0;
+    int totalBlocks = 0;
+    for(String src : srcs) {
+      INodeFile srcInode = getFileINode(src);
+      allSrcInodes[i++] = srcInode;
+      totalBlocks += srcInode.blocks.length;  
+    }
+    trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
+    
+    // since we are in the same dir - we can use same parent to remove files
+    int count = 0;
+    for(INodeFile nodeToRemove: allSrcInodes) {
+      if(nodeToRemove == null) continue;
+      
+      nodeToRemove.blocks = null;
+      trgParent.removeChild(nodeToRemove);
+      count++;
+    }
+
+    trgInode.setModificationTime(timestamp);
+    trgParent.setModificationTime(timestamp);
+    // update quota on the parent directory ('count' files removed, 0 space)
+    unprotectedUpdateCount(trgINodes, trgINodes.length-1, -count, 0);
+  }
+
   /**
-   * Remove the file from management, return blocks
+   * Delete the target directory and collect the blocks under it
+   * 
+   * @param src
+   *          Path of a directory to delete
+   * @param collectedBlocks
+   *          Blocks under the deleted directory
+   * @return true on successful deletion; else false
    */
-  boolean delete(String src) {
+  boolean delete(String src, List<Block>collectedBlocks) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "+src);
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
     }
     waitForReady();
     long now = FSNamesystem.now();
-    int filesRemoved = unprotectedDelete(src, now);
+    int filesRemoved = unprotectedDelete(src, collectedBlocks, now);
     if (filesRemoved <= 0) {
       return false;
     }
     incrDeletedFileCount(filesRemoved);
+    // Blocks will be deleted later by the caller of this method
+    FSNamesystem.getFSNamesystem().removePathAndBlocks(src, null);
     fsImage.getEditLog().logDelete(src, now);
     return true;
   }
@@ -625,14 +690,36 @@ class FSDirectory implements FSConstants
   }
   
   /**
-   * Delete a path from the name space
-   * Update the count at each ancestor directory with quota
-   * @param src a string representation of a path to an inode
-   * @param modificationTime the time the inode is removed
-   * @param deletedBlocks the place holder for the blocks to be removed
+   * Delete a path from the name space Update the count at each ancestor
+   * directory with quota
+   * 
+   * @param src
+   *          a string representation of a path to an inode
+   * 
+   * @param mTime
+   *          the time the inode is removed
+   */
+  void unprotectedDelete(String src, long mTime) {
+    List<Block> collectedBlocks = new ArrayList<Block>();
+    int filesRemoved = unprotectedDelete(src, collectedBlocks, mTime);
+    if (filesRemoved > 0) {
+      namesystem.removePathAndBlocks(src, collectedBlocks);
+    }
+  }
+  
+  /**
+   * Delete a path from the name space Update the count at each ancestor
+   * directory with quota
+   * 
+   * @param src
+   *          a string representation of a path to an inode
+   * @param collectedBlocks
+   *          blocks collected from the deleted path
+   * @param mtime
+   *          the time the inode is removed
    * @return the number of inodes deleted; 0 if no inodes are deleted.
-   */ 
-  int unprotectedDelete(String src, long modificationTime) {
+   */
+  int unprotectedDelete(String src, List<Block> collectedBlocks, long mtime) {
     src = normalizePath(src);
 
     synchronized (rootDir) {
@@ -643,32 +730,27 @@ class FSDirectory implements FSConstants
         NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
             +"failed to remove "+src+" because it does not exist");
         return 0;
-      } else if (inodes.length == 1) { // src is the root
+      } 
+      if (inodes.length == 1) { // src is the root
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
             "failed to remove " + src +
             " because the root is not allowed to be deleted");
         return 0;
-      } else {
-        try {
-          // Remove the node from the namespace
-          removeChild(inodes, inodes.length-1);
-          // set the parent's modification time
-          inodes[inodes.length-2].setModificationTime(modificationTime);
-          // GC all the blocks underneath the node.
-          ArrayList<Block> v = new ArrayList<Block>();
-          int filesRemoved = targetNode.collectSubtreeBlocksAndClear(v);
-          namesystem.removePathAndBlocks(src, v);
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-              +src+" is removed");
-          }
-          return filesRemoved;
-        } catch (IOException e) {
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
-              "failed to remove " + src + " because " + e.getMessage());
-          return 0;
-        }
+      } 
+      int pos = inodes.length - 1;
+      targetNode = removeChild(inodes, pos);
+      if (targetNode == null) {
+        return 0;
+      }
+      // set the parent's modification time
+      inodes[pos - 1].setModificationTime(mtime);
+      int filesRemoved = targetNode
+          .collectSubtreeBlocksAndClear(collectedBlocks);
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+            + src + " is removed");
       }
+      return filesRemoved;
     }
   }
 
@@ -892,6 +974,25 @@ class FSDirectory implements FSConstants
     }
   }
   
+  /**
+   * updates quota without verification
+   * callers responsibility is to make sure quota is not exceeded
+   * @param inodes
+   * @param numOfINodes
+   * @param nsDelta
+   * @param dsDelta
+   */
+   void unprotectedUpdateCount(INode[] inodes, int numOfINodes, 
+                                      long nsDelta, long dsDelta) {
+    for(int i=0; i < numOfINodes; i++) {
+      if (inodes[i].isQuotaSet()) { // a directory with quota
+        INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+        node.addSpaceConsumed(nsDelta, dsDelta);
+      }
+    }
+  }
+   
+  
   /** Return the name of the path represented by inodes at [0, pos] */
   private static String getFullPathName(INode[] inodes, int pos) {
     StringBuilder fullPathName = new StringBuilder();
@@ -1399,4 +1500,8 @@ class FSDirectory implements FSConstants
       inode.setLocalName(name.getBytes());
     }
   }
+  
+  void shutdown() {
+    nameCache.reset();
+  }
 }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jun 21 06:37:27 2013
@@ -86,6 +86,7 @@ public class FSEditLog {
   private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
   private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+  private static final byte OP_CONCAT_DELETE = 16; // concat files.
   private static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token
   private static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token
   private static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token
@@ -650,8 +651,8 @@ public class FSEditLog {
    * This is where we apply edits that we've been writing to disk all
    * along.
    */
-  static int loadFSEdits(EditLogInputStream edits, int tolerationLength
-      ) throws IOException {
+  static int loadFSEdits(EditLogInputStream edits, int tolerationLength,
+      MetaRecoveryContext recovery) throws IOException {
     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
@@ -664,10 +665,13 @@ public class FSEditLog {
         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
         numOpTimes = 0, numOpGetDelegationToken = 0,
         numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0,
-        numOpUpdateMasterKey = 0, numOpOther = 0;
-
+        numOpUpdateMasterKey = 0, numOpOther = 0,
+        numOpConcatDelete = 0;
+    long highestGenStamp = -1;
     long startTime = FSNamesystem.now();
 
+    LOG.info("Start loading edits file " + edits.getName());
+    //
     // Keep track of the file offsets of the last several opcodes.
     // This is handy when manually recovering corrupted edits files.
     PositionTrackingInputStream tracker = 
@@ -840,7 +844,28 @@ public class FSEditLog {
           short replication = adjustReplication(readShort(in));
           fsDir.unprotectedSetReplication(path, replication, null);
           break;
-        } 
+        }
+        case OP_CONCAT_DELETE: {
+          if (logVersion > -22) {
+            throw new IOException("Unexpected opcode " + opcode
+                + " for version " + logVersion);
+          }
+          numOpConcatDelete++;
+          int length = in.readInt();
+          if (length < 3) { // trg, srcs.., timestam
+            throw new IOException("Incorrect data format. " 
+                                  + "ConcatDelete operation.");
+          }
+          String trg = FSImage.readString(in);
+          int srcSize = length - 1 - 1; //trg and timestamp
+          String [] srcs = new String [srcSize];
+          for(int i=0; i<srcSize;i++) {
+            srcs[i]= FSImage.readString(in);
+          }
+          timestamp = readLong(in);
+          fsDir.unprotectedConcat(trg, srcs, timestamp);
+          break;
+        }
         case OP_RENAME: {
           numOpRename++;
           int length = in.readInt();
@@ -896,6 +921,11 @@ public class FSEditLog {
         case OP_SET_GENSTAMP: {
           numOpSetGenStamp++;
           long lw = in.readLong();
+          if ((highestGenStamp != -1) && (highestGenStamp + 1 != lw)) {
+            throw new IOException("OP_SET_GENSTAMP tried to set a genstamp of " + lw + 
+              " but the previous highest genstamp was " + highestGenStamp);
+          }
+          highestGenStamp = lw;
           fsDir.namesystem.setGenerationStamp(lw);
           break;
         }
@@ -1068,7 +1098,7 @@ public class FSEditLog {
         in.reset(); //reset to the beginning position of this transaction
       } else {
         //edit log toleration feature is disabled
-        throw new IOException(msg, t);
+        MetaRecoveryContext.editLogLoaderPrompt(msg, recovery);
       }
     } finally {
       try {
@@ -1093,6 +1123,7 @@ public class FSEditLog {
           + " numOpRenewDelegationToken = " + numOpRenewDelegationToken
           + " numOpCancelDelegationToken = " + numOpCancelDelegationToken
           + " numOpUpdateMasterKey = " + numOpUpdateMasterKey
+          + " numOpConcatDelete  = " + numOpConcatDelete
           + " numOpOther = " + numOpOther);
     }
 
@@ -1378,6 +1409,21 @@ public class FSEditLog {
     logEdit(OP_SET_OWNER, new UTF8(src), u, g);
   }
 
+  /**
+   * concat(trg,src..) log
+   */
+  void logConcat(String trg, String [] srcs, long timestamp) {
+    int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+    UTF8 info[] = new UTF8[size];
+    int idx = 0;
+    info[idx++] = new UTF8(trg);
+    for(int i=0; i<srcs.length; i++) {
+      info[idx++] = new UTF8(srcs[i]);
+    }
+    info[idx] = FSEditLog.toLogLong(timestamp);
+    logEdit(OP_CONCAT_DELETE, new ArrayWritable(UTF8.class, info));
+  }
+  
   /** 
    * Add delete file record to edit log
    */
@@ -1538,7 +1584,8 @@ public class FSEditLog {
         // file exists.
         //
         getEditFile(sd).delete();
-        if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {          
+        if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
+          sd.unlock();
           removeEditsForStorageDir(sd);
           fsimage.updateRemovedDirs(sd);
           it.remove();

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jun 21 06:37:27 2013
@@ -141,7 +141,7 @@ public class FSImage extends Storage {
 
   /** Flag to restore removed storage directories at checkpointing */
   private boolean restoreRemovedDirs = DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT;
-  
+
   private int editsTolerationLength = DFSConfigKeys.DFS_NAMENODE_EDITS_TOLERATION_LENGTH_DEFAULT;
 
   /**
@@ -375,14 +375,15 @@ public class FSImage extends Storage {
     case REGULAR:
       // just load the image
     }
-    return loadFSImage();
+    return loadFSImage(startOpt.createRecoveryContext());
   }
 
   private void doUpgrade() throws IOException {
+    MetaRecoveryContext recovery = null;
     if(getDistributedUpgradeState()) {
       // only distributed upgrade need to continue
       // don't do version upgrade
-      this.loadFSImage();
+      this.loadFSImage(recovery);
       initializeDistributedUpgrade();
       return;
     }
@@ -398,7 +399,7 @@ public class FSImage extends Storage {
     }
 
     // load the latest image
-    this.loadFSImage();
+    this.loadFSImage(recovery);
 
     // Do upgrade for each directory
     long oldCTime = this.getCTime();
@@ -742,7 +743,7 @@ public class FSImage extends Storage {
    * @return whether the image should be saved
    * @throws IOException
    */
-  boolean loadFSImage() throws IOException {
+  boolean loadFSImage(MetaRecoveryContext recovery) throws IOException {
     // Now check all curFiles and see which is the newest
     long latestNameCheckpointTime = Long.MIN_VALUE;
     long latestEditsCheckpointTime = Long.MIN_VALUE;
@@ -822,22 +823,27 @@ public class FSImage extends Storage {
     needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD);
 
     long startTime = FSNamesystem.now();
-    long imageSize = getImageFile(latestNameSD, NameNodeFile.IMAGE).length();
+    File imageFile = getImageFile(latestNameSD, NameNodeFile.IMAGE);
+    long imageSize = imageFile.length();
 
     //
     // Load in bits
     //
     latestNameSD.read();
-    needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE));
-    LOG.info("Image file of size " + imageSize + " loaded in " 
+    LOG.info("Start loading image file " + imageFile.getPath().toString());
+    needToSave |= loadFSImage(imageFile);
+    LOG.info("Image file " + imageFile.getPath().toString() +
+        " of size " + imageSize + " bytes loaded in "
         + (FSNamesystem.now() - startTime)/1000 + " seconds.");
     
     // Load latest edits
-    if (latestNameCheckpointTime > latestEditsCheckpointTime)
+    if (latestNameCheckpointTime > latestEditsCheckpointTime) {
       // the image is already current, discard edits
       needToSave |= true;
-    else // latestNameCheckpointTime == latestEditsCheckpointTime
-      needToSave |= (loadFSEdits(latestEditsSD) > 0);
+      FSNamesystem.getFSNamesystem().dir.updateCountForINodeWithQuota();
+    } else { // latestNameCheckpointTime == latestEditsCheckpointTime
+      needToSave |= (loadFSEdits(latestEditsSD, recovery) > 0);
+    }
     
     return needToSave;
   }
@@ -1015,16 +1021,17 @@ public class FSImage extends Storage {
    * @return number of edits loaded
    * @throws IOException
    */
-  int loadFSEdits(StorageDirectory sd) throws IOException {
+  int loadFSEdits(StorageDirectory sd, MetaRecoveryContext recovery)
+      throws IOException {
     int numEdits = 0;
     EditLogFileInputStream edits = 
       new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
-    numEdits = FSEditLog.loadFSEdits(edits, editsTolerationLength);
+    numEdits = FSEditLog.loadFSEdits(edits, editsTolerationLength, recovery);
     edits.close();
     File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
     if (editsNew.exists() && editsNew.length() > 0) {
       edits = new EditLogFileInputStream(editsNew);
-      numEdits += FSEditLog.loadFSEdits(edits, editsTolerationLength);
+      numEdits += FSEditLog.loadFSEdits(edits, editsTolerationLength, recovery);
       edits.close();
     }
     // update the counts.
@@ -1063,8 +1070,9 @@ public class FSImage extends Storage {
       out.close();
     }
 
-    LOG.info("Image file of size " + newFile.length() + " saved in " 
-        + (FSNamesystem.now() - startTime)/1000 + " seconds.");
+    LOG.info("Image file " + newFile + " of size " + newFile.length() +
+        " bytes saved in " + (FSNamesystem.now() - startTime)/1000 +
+        " seconds.");
   }
 
   /**
@@ -1237,7 +1245,7 @@ public class FSImage extends Storage {
     FSEditLog.LOG.info(DFSConfigKeys.DFS_NAMENODE_EDITS_TOLERATION_LENGTH_KEY
         + " = " + editsTolerationLength);
   }  
-
+  
   /** restore a metadata file */
   private static void restoreFile(File src, File dstdir, String dstfile)
       throws IOException {
@@ -1816,9 +1824,9 @@ public class FSImage extends Storage {
   }
 
   static private final UTF8 U_STR = new UTF8();
-  static String readString(DataInputStream in) throws IOException {
+  public static String readString(DataInputStream in) throws IOException {
     U_STR.readFields(in);
-    return U_STR.toString();
+    return U_STR.toStringChecked();
   }
 
   static String readString_EmptyAsNull(DataInputStream in) throws IOException {
@@ -1826,7 +1834,7 @@ public class FSImage extends Storage {
     return s.isEmpty()? null: s;
   }
 
-  static byte[] readBytes(DataInputStream in) throws IOException {
+  public static byte[] readBytes(DataInputStream in) throws IOException {
     U_STR.readFields(in);
     int len = U_STR.getLength();
     byte[] bytes = new byte[len];