You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/06/06 02:18:04 UTC

svn commit: r1346682 [3/9] - in /hadoop/common/branches/HDFS-3092/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/contrib/bkjournal/ ha...

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Wed Jun  6 00:17:38 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -25,6 +27,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -49,13 +52,19 @@ import com.google.common.annotations.Vis
  */
 @InterfaceAudience.Private
 public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+  private static final String enableDebugLogging =
+    "For more information, please enable DEBUG log level on "
+    + ((Log4JLogger)LOG).getLogger().getName();
+
   private boolean considerLoad; 
   private boolean preferLocalNode = true;
   private NetworkTopology clusterMap;
   private FSClusterStats stats;
-  static final String enableDebugLogging = "For more information, please enable"
-    + " DEBUG level logging on the "
-    + "org.apache.hadoop.hdfs.server.namenode.FSNamesystem logger.";
+  private long heartbeatInterval;   // interval for DataNode heartbeats
+  /**
+   * A miss of that many heartbeats is tolerated for replica deletion policy.
+   */
+  private int tolerateHeartbeatMultiplier;
 
   BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
                            NetworkTopology clusterMap) {
@@ -71,6 +80,12 @@ public class BlockPlacementPolicyDefault
     this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
+    this.heartbeatInterval = conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
+    this.tolerateHeartbeatMultiplier = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
+        DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
   }
 
   private ThreadLocal<StringBuilder> threadLocalBuilder =
@@ -551,24 +566,33 @@ public class BlockPlacementPolicyDefault
                                                  short replicationFactor,
                                                  Collection<DatanodeDescriptor> first, 
                                                  Collection<DatanodeDescriptor> second) {
+    long oldestHeartbeat =
+      now() - heartbeatInterval * tolerateHeartbeatMultiplier;
+    DatanodeDescriptor oldestHeartbeatNode = null;
     long minSpace = Long.MAX_VALUE;
-    DatanodeDescriptor cur = null;
+    DatanodeDescriptor minSpaceNode = null;
 
     // pick replica from the first Set. If first is empty, then pick replicas
     // from second set.
     Iterator<DatanodeDescriptor> iter =
           first.isEmpty() ? second.iterator() : first.iterator();
 
-    // pick node with least free space
+    // Pick the node with the oldest heartbeat or with the least free space,
+    // if all hearbeats are within the tolerable heartbeat interval
     while (iter.hasNext() ) {
       DatanodeDescriptor node = iter.next();
       long free = node.getRemaining();
+      long lastHeartbeat = node.getLastUpdate();
+      if(lastHeartbeat < oldestHeartbeat) {
+        oldestHeartbeat = lastHeartbeat;
+        oldestHeartbeatNode = node;
+      }
       if (minSpace > free) {
         minSpace = free;
-        cur = node;
+        minSpaceNode = node;
       }
     }
-    return cur;
+    return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
   }
   
   @VisibleForTesting

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Jun  6 00:17:38 2012
@@ -100,11 +100,7 @@ public class DatanodeManager {
    * with the same storage id; and </li>
    * <li>removed if and only if an existing datanode is restarted to serve a
    * different storage id.</li>
-   * </ul> <br>
-   * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
-   * in the namespace image file. Only the {@link DatanodeInfo} part is 
-   * persistent, the list of blocks is restored from the datanode block
-   * reports. 
+   * </ul> <br> 
    * <p>
    * Mapping: StorageID -> DatanodeDescriptor
    */
@@ -832,7 +828,9 @@ public class DatanodeManager {
 
     if (InetAddresses.isInetAddress(hostStr)) {
       // The IP:port is sufficient for listing in a report
-      dnId = new DatanodeID(hostStr, "", port);
+      dnId = new DatanodeID(hostStr, "", "", port,
+          DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+          DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
     } else {
       String ipAddr = "";
       try {
@@ -840,7 +838,9 @@ public class DatanodeManager {
       } catch (UnknownHostException e) {
         LOG.warn("Invalid hostname " + hostStr + " in hosts file");
       }
-      dnId = new DatanodeID(ipAddr, hostStr, port);
+      dnId = new DatanodeID(ipAddr, hostStr, "", port,
+          DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
+          DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
     }
     return dnId;
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Wed Jun  6 00:17:38 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.io.PrintWriter;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -54,10 +53,23 @@ class InvalidateBlocks {
     return numBlocks;
   }
 
-  /** Does this contain the block which is associated with the storage? */
+  /**
+   * @return true if the given storage has the given block listed for
+   * invalidation. Blocks are compared including their generation stamps:
+   * if a block is pending invalidation but with a different generation stamp,
+   * returns false.
+   * @param storageID the storage to check
+   * @param the block to look for
+   * 
+   */
   synchronized boolean contains(final String storageID, final Block block) {
-    final Collection<Block> s = node2blocks.get(storageID);
-    return s != null && s.contains(block);
+    final LightWeightHashSet<Block> s = node2blocks.get(storageID);
+    if (s == null) {
+      return false; // no invalidate blocks for this storage ID
+    }
+    Block blockInSet = s.getElement(block);
+    return blockInSet != null &&
+        block.getGenerationStamp() == blockInSet.getGenerationStamp();
   }
 
   /**

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java Wed Jun  6 00:17:38 2012
@@ -19,26 +19,20 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.io.IOException;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.fs.ContentSummary;
-
 /** 
  * This interface is used by the block manager to expose a
  * few characteristics of a collection of Block/BlockUnderConstruction.
  */
 public interface MutableBlockCollection extends BlockCollection {
   /**
-   * Set block 
+   * Set the block at the given index.
    */
-  public void setBlock(int idx, BlockInfo blk);
+  public void setBlock(int index, BlockInfo blk);
 
   /**
-   * Convert the last block of the collection to an under-construction block.
-   * Set its locations.
+   * Convert the last block of the collection to an under-construction block
+   * and set the locations.
    */
   public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
-                       DatanodeDescriptor[] targets) throws IOException;
+      DatanodeDescriptor[] locations) throws IOException;
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Jun  6 00:17:38 2012
@@ -71,10 +71,12 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+
 @InterfaceAudience.Private
 public class JspHelper {
   public static final String CURRENT_CONF = "current.conf";
-  final static public String WEB_UGI_PROPERTY_NAME = DFSConfigKeys.DFS_WEB_UGI_KEY;
   public static final String DELEGATION_PARAMETER_NAME = DelegationParam.NAME;
   public static final String NAMENODE_ADDRESS = "nnaddr";
   static final String SET_DELEGATION = "&" + DELEGATION_PARAMETER_NAME +
@@ -438,9 +440,9 @@ public class JspHelper {
 
   /** Return a table containing version information. */
   public static String getVersionTable() {
-    return "<div id='dfstable'><table>"       
-        + "\n  <tr><td id='col1'>Version:</td><td>" + VersionInfo.getVersion() + ", " + VersionInfo.getRevision()
-        + "\n  <tr><td id='col1'>Compiled:</td><td>" + VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + VersionInfo.getBranch()
+    return "<div class='dfstable'><table>"       
+        + "\n  <tr><td class='col1'>Version:</td><td>" + VersionInfo.getVersion() + ", " + VersionInfo.getRevision() + "</td></tr>"
+        + "\n  <tr><td class='col1'>Compiled:</td><td>" + VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + VersionInfo.getBranch() + "</td></tr>"
         + "\n</table></div>";
   }
 
@@ -483,11 +485,12 @@ public class JspHelper {
    */
   public static UserGroupInformation getDefaultWebUser(Configuration conf
                                                        ) throws IOException {
-    String[] strings = conf.getStrings(JspHelper.WEB_UGI_PROPERTY_NAME);
-    if (strings == null || strings.length == 0) {
+    String user = conf.get(
+        HADOOP_HTTP_STATIC_USER, DEFAULT_HADOOP_HTTP_STATIC_USER);
+    if (user == null || user.length() == 0) {
       throw new IOException("Cannot determine UGI from request or conf");
     }
-    return UserGroupInformation.createRemoteUser(strings[0]);
+    return UserGroupInformation.createRemoteUser(user);
   }
 
   private static InetSocketAddress getNNServiceAddress(ServletContext context,

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Jun  6 00:17:38 2012
@@ -591,7 +591,8 @@ class BPOfferService {
       processDistributedUpgradeCommand((UpgradeCommand)cmd);
       break;
     case DatanodeProtocol.DNA_RECOVERBLOCK:
-      dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
+      String who = "NameNode at " + actor.getNNSocketAddress();
+      dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
       break;
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
@@ -608,6 +609,9 @@ class BPOfferService {
       if (bandwidth > 0) {
         DataXceiverServer dxcs =
                      (DataXceiverServer) dn.dataXceiverServer.getRunnable();
+        LOG.info("Updating balance throttler bandwidth from "
+            + dxcs.balanceThrottler.getBandwidth() + " bytes/s "
+            + "to: " + bandwidth + " bytes/s.");
         dxcs.balanceThrottler.setBandwidth(bandwidth);
       }
       break;

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java Wed Jun  6 00:17:38 2012
@@ -145,7 +145,7 @@ class BlockPoolManager {
   void refreshNamenodes(Configuration conf)
       throws IOException {
     LOG.info("Refresh request received for nameservices: "
-        + conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES));
+        + conf.get(DFSConfigKeys.DFS_NAMESERVICES));
     
     Map<String, Map<String, InetSocketAddress>> newAddressMap = 
       DFSUtil.getNNServiceRpcAddresses(conf);

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Jun  6 00:17:38 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -110,6 +111,8 @@ class BlockReceiver implements Closeable
   private final BlockConstructionStage stage;
   private final boolean isTransfer;
 
+  private boolean syncOnClose;
+
   BlockReceiver(final ExtendedBlock block, final DataInputStream in,
       final String inAddr, final String myAddr,
       final BlockConstructionStage stage, 
@@ -245,14 +248,18 @@ class BlockReceiver implements Closeable
    * close files.
    */
   public void close() throws IOException {
-
     IOException ioe = null;
+    if (syncOnClose && (out != null || checksumOut != null)) {
+      datanode.metrics.incrFsyncCount();      
+    }
     // close checksum file
     try {
       if (checksumOut != null) {
         checksumOut.flush();
-        if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) {
+        if (syncOnClose && (cout instanceof FileOutputStream)) {
+          long start = Util.now();
           ((FileOutputStream)cout).getChannel().force(true);
+          datanode.metrics.addFsync(Util.now() - start);
         }
         checksumOut.close();
         checksumOut = null;
@@ -267,8 +274,10 @@ class BlockReceiver implements Closeable
     try {
       if (out != null) {
         out.flush();
-        if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) {
+        if (syncOnClose && (out instanceof FileOutputStream)) {
+          long start = Util.now();
           ((FileOutputStream)out).getChannel().force(true);
+          datanode.metrics.addFsync(Util.now() - start);
         }
         out.close();
         out = null;
@@ -290,12 +299,25 @@ class BlockReceiver implements Closeable
    * Flush block data and metadata files to disk.
    * @throws IOException
    */
-  void flush() throws IOException {
+  void flushOrSync(boolean isSync) throws IOException {
+    if (isSync && (out != null || checksumOut != null)) {
+      datanode.metrics.incrFsyncCount();      
+    }
     if (checksumOut != null) {
       checksumOut.flush();
+      if (isSync && (cout instanceof FileOutputStream)) {
+        long start = Util.now();
+        ((FileOutputStream)cout).getChannel().force(true);
+        datanode.metrics.addFsync(Util.now() - start);
+      }
     }
     if (out != null) {
       out.flush();
+      if (isSync && (out instanceof FileOutputStream)) {
+        long start = Util.now();
+        ((FileOutputStream)out).getChannel().force(true);
+        datanode.metrics.addFsync(Util.now() - start);
+      }
     }
   }
 
@@ -533,7 +555,9 @@ class BlockReceiver implements Closeable
       header.getOffsetInBlock(),
       header.getSeqno(),
       header.isLastPacketInBlock(),
-      header.getDataLen(), endOfHeader);
+      header.getDataLen(),
+      header.getSyncBlock(),
+      endOfHeader);
   }
 
   /**
@@ -549,15 +573,19 @@ class BlockReceiver implements Closeable
    * returns the number of data bytes that the packet has.
    */
   private int receivePacket(long offsetInBlock, long seqno,
-      boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
+      boolean lastPacketInBlock, int len, boolean syncBlock,
+      int endOfHeader) throws IOException {
     if (LOG.isDebugEnabled()){
       LOG.debug("Receiving one packet for block " + block +
                 " of length " + len +
                 " seqno " + seqno +
                 " offsetInBlock " + offsetInBlock +
+                " syncBlock " + syncBlock +
                 " lastPacketInBlock " + lastPacketInBlock);
     }
-    
+    // make sure the block gets sync'ed upon close
+    this.syncOnClose |= syncBlock && lastPacketInBlock;
+
     // update received bytes
     long firstByteInBlock = offsetInBlock;
     offsetInBlock += len;
@@ -587,6 +615,10 @@ class BlockReceiver implements Closeable
       if(LOG.isDebugEnabled()) {
         LOG.debug("Receiving an empty packet or the end of the block " + block);
       }
+      // flush unless close() would flush anyway
+      if (syncBlock && !lastPacketInBlock) {
+        flushOrSync(true);
+      }
     } else {
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
@@ -677,8 +709,8 @@ class BlockReceiver implements Closeable
             );
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
-          /// flush entire packet
-          flush();
+          /// flush entire packet, sync unless close() will sync
+          flushOrSync(syncBlock && !lastPacketInBlock);
           
           replicaInfo.setLastChecksumAndDataLen(
             offsetInBlock, lastChunkChecksum
@@ -730,6 +762,7 @@ class BlockReceiver implements Closeable
       String mirrAddr, DataTransferThrottler throttlerArg,
       DatanodeInfo[] downstreams) throws IOException {
 
+      syncOnClose = datanode.getDnConf().syncOnClose;
       boolean responderClosed = false;
       mirrorOut = mirrOut;
       mirrorAddr = mirrAddr;
@@ -768,7 +801,7 @@ class BlockReceiver implements Closeable
           datanode.data.convertTemporaryToRbw(block);
         } else {
           // for isDatnode or TRANSFER_FINALIZED
-          // Finalize the block. Does this fsync()?
+          // Finalize the block.
           datanode.data.finalizeBlock(block);
         }
         datanode.metrics.incrBlocksWritten();

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Wed Jun  6 00:17:38 2012
@@ -701,8 +701,9 @@ class BlockSender implements java.io.Clo
    */
   private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
     pkt.clear();
+    // both syncBlock and syncPacket are false
     PacketHeader header = new PacketHeader(packetLen, offset, seqno,
-        (dataLen == 0), dataLen);
+        (dataLen == 0), dataLen, false);
     header.putInBuffer(pkt);
   }
   

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Jun  6 00:17:38 2012
@@ -163,6 +163,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
@@ -667,23 +668,16 @@ public class DataNode extends Configured
    * @param nsInfo the namespace info from the first part of the NN handshake
    */
   DatanodeRegistration createBPRegistration(NamespaceInfo nsInfo) {
-    final String xferIp = streamingAddr.getAddress().getHostAddress();
-    DatanodeRegistration bpRegistration = new DatanodeRegistration(xferIp, getXferPort());
-    bpRegistration.setInfoPort(getInfoPort());
-    bpRegistration.setIpcPort(getIpcPort());
-    bpRegistration.setHostName(hostName);
-    bpRegistration.setStorageID(getStorageId());
-    bpRegistration.setSoftwareVersion(VersionInfo.getVersion());
-
     StorageInfo storageInfo = storage.getBPStorage(nsInfo.getBlockPoolID());
     if (storageInfo == null) {
       // it's null in the case of SimulatedDataSet
-      bpRegistration.getStorageInfo().layoutVersion = HdfsConstants.LAYOUT_VERSION;
-      bpRegistration.setStorageInfo(nsInfo);
-    } else {
-      bpRegistration.setStorageInfo(storageInfo);
+      storageInfo = new StorageInfo(nsInfo);
     }
-    return bpRegistration;
+    DatanodeID dnId = new DatanodeID(
+        streamingAddr.getAddress().getHostAddress(), hostName, 
+        getStorageId(), getXferPort(), getInfoPort(), getIpcPort());
+    return new DatanodeRegistration(dnId, storageInfo, 
+        new ExportedBlockKeys(), VersionInfo.getVersion());
   }
 
   /**
@@ -1713,13 +1707,16 @@ public class DataNode extends Configured
     secureMain(args, null);
   }
 
-  public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
+  public Daemon recoverBlocks(
+      final String who,
+      final Collection<RecoveringBlock> blocks) {
+    
     Daemon d = new Daemon(threadGroup, new Runnable() {
       /** Recover a list of blocks. It is run by the primary datanode. */
       public void run() {
         for(RecoveringBlock b : blocks) {
           try {
-            logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
+            logRecoverBlock(who, b);
             recoverBlock(b);
           } catch (IOException e) {
             LOG.warn("recoverBlocks FAILED: " + b, e);
@@ -1980,14 +1977,13 @@ public class DataNode extends Configured
         datanodes, storages);
   }
   
-  private static void logRecoverBlock(String who,
-      ExtendedBlock block, DatanodeID[] targets) {
-    StringBuilder msg = new StringBuilder(targets[0].toString());
-    for (int i = 1; i < targets.length; i++) {
-      msg.append(", " + targets[i]);
-    }
+  private static void logRecoverBlock(String who, RecoveringBlock rb) {
+    ExtendedBlock block = rb.getBlock();
+    DatanodeInfo[] targets = rb.getLocations();
+    
     LOG.info(who + " calls recoverBlock(block=" + block
-        + ", targets=[" + msg + "])");
+        + ", targets=[" + Joiner.on(", ").join(targets) + "]"
+        + ", newGenerationStamp=" + rb.getNewGenerationStamp() + ")");
   }
 
   @Override // ClientDataNodeProtocol
@@ -2032,6 +2028,18 @@ public class DataNode extends Configured
 
     //get replica information
     synchronized(data) {
+      Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
+          b.getBlockId());
+      if (null == storedBlock) {
+        throw new IOException(b + " not found in datanode.");
+      }
+      storedGS = storedBlock.getGenerationStamp();
+      if (storedGS < b.getGenerationStamp()) {
+        throw new IOException(storedGS
+            + " = storedGS < b.getGenerationStamp(), b=" + b);
+      }
+      // Update the genstamp with storedGS
+      b.setGenerationStamp(storedGS);
       if (data.isValidRbw(b)) {
         stage = BlockConstructionStage.TRANSFER_RBW;
       } else if (data.isValidBlock(b)) {
@@ -2040,18 +2048,9 @@ public class DataNode extends Configured
         final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
         throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
       }
-
-      storedGS = data.getStoredBlock(b.getBlockPoolId(),
-          b.getBlockId()).getGenerationStamp();
-      if (storedGS < b.getGenerationStamp()) {
-        throw new IOException(
-            storedGS + " = storedGS < b.getGenerationStamp(), b=" + b);        
-      }
       visible = data.getReplicaVisibleLength(b);
     }
-
-    //set storedGS and visible length
-    b.setGenerationStamp(storedGS);
+    //set visible length
     b.setNumBytes(visible);
 
     if (targets.length > 0) {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Wed Jun  6 00:17:38 2012
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.net.URLEncoder;
 import java.security.PrivilegedExceptionAction;
 import java.text.SimpleDateFormat;
@@ -616,9 +617,12 @@ public class DatanodeJspHelper {
                                         Configuration conf
                                         ) throws IOException,
                                                  InterruptedException {
-    final String referrer = JspHelper.validateURL(req.getParameter("referrer"));
+    String referrer = null;
     boolean noLink = false;
-    if (referrer == null) {
+    try {
+      referrer = new URL(req.getParameter("referrer")).toString();
+    } catch (IOException e) {
+      referrer = null;
       noLink = true;
     }
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java Wed Jun  6 00:17:38 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.mortbay.jetty.nio.SelectChannelConnector;
 
 /**
@@ -60,10 +61,7 @@ public class SecureDataNodeStarter imple
   @Override
   public void init(DaemonContext context) throws Exception {
     System.err.println("Initializing secure datanode resources");
-    // We should only start up a secure datanode in a Kerberos-secured cluster
-    Configuration conf = new Configuration(); // Skip UGI method to not log in
-    if(!conf.get(HADOOP_SECURITY_AUTHENTICATION).equals("kerberos"))
-      throw new RuntimeException("Cannot start secure datanode in unsecure cluster");
+    Configuration conf = new Configuration();
     
     // Stash command-line arguments for regular datanode
     args = context.getArguments();
@@ -98,7 +96,8 @@ public class SecureDataNodeStarter imple
     System.err.println("Successfully obtained privileged resources (streaming port = "
         + ss + " ) (http listener port = " + listener.getConnection() +")");
     
-    if (ss.getLocalPort() >= 1023 || listener.getPort() >= 1023) {
+    if ((ss.getLocalPort() >= 1023 || listener.getPort() >= 1023) &&
+        UserGroupInformation.isSecurityEnabled()) {
       throw new RuntimeException("Cannot start secure datanode with unprivileged ports");
     }
     System.err.println("Opened streaming server at " + streamingAddr);

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Wed Jun  6 00:17:38 2012
@@ -61,6 +61,8 @@ public class DataNodeMetrics {
   @Metric MutableCounterLong writesFromLocalClient;
   @Metric MutableCounterLong writesFromRemoteClient;
   @Metric MutableCounterLong blocksGetLocalPathInfo;
+
+  @Metric MutableCounterLong fsyncCount;
   
   @Metric MutableCounterLong volumeFailures;
 
@@ -72,6 +74,8 @@ public class DataNodeMetrics {
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
 
+  @Metric MutableRate fsync;
+
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
 
@@ -151,6 +155,14 @@ public class DataNodeMetrics {
     blocksRead.incr();
   }
 
+  public void incrFsyncCount() {
+    fsyncCount.incr();
+  }
+
+  public void addFsync(long latency) {
+    fsync.add(latency);
+  }
+
   public void shutdown() {
     DefaultMetricsSystem.shutdown();
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Wed Jun  6 00:17:38 2012
@@ -207,7 +207,7 @@ public class BackupImage extends FSImage
       int logVersion = storage.getLayoutVersion();
       backupInputStream.setBytes(data, logVersion);
 
-      long numTxnsAdvanced = logLoader.loadEditRecords(logVersion, 
+      long numTxnsAdvanced = logLoader.loadEditRecords(
           backupInputStream, true, lastAppliedTxId + 1, null);
       if (numTxnsAdvanced != numTxns) {
         throw new IOException("Batch of txns starting at txnid " +

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Wed Jun  6 00:17:38 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -60,19 +61,10 @@ class BackupJournalManager implements Jo
   }
 
   @Override
-  public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
-      throws IOException, CorruptionException {
+  public void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean inProgressOk) {
     // This JournalManager is never used for input. Therefore it cannot
     // return any transactions
-    return 0;
-  }
-  
-  @Override
-  public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
-      throws IOException {
-    // This JournalManager is never used for input. Therefore it cannot
-    // return any transactions
-    throw new IOException("Unsupported operation");
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Wed Jun  6 00:17:38 2012
@@ -119,7 +119,7 @@ class EditLogBackupInputStream extends E
 
     this.version = version;
 
-    reader = new FSEditLogOp.Reader(in, version);
+    reader = new FSEditLogOp.Reader(in, tracker, version);
   }
 
   void clear() throws IOException {
@@ -129,12 +129,12 @@ class EditLogBackupInputStream extends E
   }
 
   @Override
-  public long getFirstTxId() throws IOException {
+  public long getFirstTxId() {
     return HdfsConstants.INVALID_TXID;
   }
 
   @Override
-  public long getLastTxId() throws IOException {
+  public long getLastTxId() {
     return HdfsConstants.INVALID_TXID;
   }
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Wed Jun  6 00:17:38 2012
@@ -24,10 +24,14 @@ import java.io.IOException;
 import java.io.BufferedInputStream;
 import java.io.EOFException;
 import java.io.DataInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.io.IOUtils;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 
 /**
  * An implementation of the abstract class {@link EditLogInputStream}, which
@@ -35,13 +39,21 @@ import com.google.common.annotations.Vis
  */
 public class EditLogFileInputStream extends EditLogInputStream {
   private final File file;
-  private final FileInputStream fStream;
-  final private long firstTxId;
-  final private long lastTxId;
-  private final int logVersion;
-  private final FSEditLogOp.Reader reader;
-  private final FSEditLogLoader.PositionTrackingInputStream tracker;
+  private final long firstTxId;
+  private final long lastTxId;
   private final boolean isInProgress;
+  static private enum State {
+    UNINIT,
+    OPEN,
+    CLOSED
+  }
+  private State state = State.UNINIT;
+  private FileInputStream fStream = null;
+  private int logVersion = 0;
+  private FSEditLogOp.Reader reader = null;
+  private FSEditLogLoader.PositionTrackingInputStream tracker = null;
+  private DataInputStream dataIn = null;
+  static final Log LOG = LogFactory.getLog(EditLogInputStream.class);
   
   /**
    * Open an EditLogInputStream for the given file.
@@ -68,34 +80,43 @@ public class EditLogFileInputStream exte
    *         header
    */
   public EditLogFileInputStream(File name, long firstTxId, long lastTxId,
-      boolean isInProgress)
-      throws LogHeaderCorruptException, IOException {
-    file = name;
-    fStream = new FileInputStream(name);
-
-    BufferedInputStream bin = new BufferedInputStream(fStream);
-    tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
-    DataInputStream in = new DataInputStream(tracker);
-
-    try {
-      logVersion = readLogVersion(in);
-    } catch (EOFException eofe) {
-      throw new LogHeaderCorruptException("No header found in log");
-    }
-
-    reader = new FSEditLogOp.Reader(in, logVersion);
+      boolean isInProgress) {
+    this.file = name;
     this.firstTxId = firstTxId;
     this.lastTxId = lastTxId;
     this.isInProgress = isInProgress;
   }
 
+  private void init() throws LogHeaderCorruptException, IOException {
+    Preconditions.checkState(state == State.UNINIT);
+    BufferedInputStream bin = null;
+    try {
+      fStream = new FileInputStream(file);
+      bin = new BufferedInputStream(fStream);
+      tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
+      dataIn = new DataInputStream(tracker);
+      try {
+        logVersion = readLogVersion(dataIn);
+      } catch (EOFException eofe) {
+        throw new LogHeaderCorruptException("No header found in log");
+      }
+      reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
+      state = State.OPEN;
+    } finally {
+      if (reader == null) {
+        IOUtils.cleanup(LOG, dataIn, tracker, bin, fStream);
+        state = State.CLOSED;
+      }
+    }
+  }
+
   @Override
-  public long getFirstTxId() throws IOException {
+  public long getFirstTxId() {
     return firstTxId;
   }
   
   @Override
-  public long getLastTxId() throws IOException {
+  public long getLastTxId() {
     return lastTxId;
   }
 
@@ -104,33 +125,95 @@ public class EditLogFileInputStream exte
     return file.getPath();
   }
 
+  private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
+    FSEditLogOp op = null;
+    switch (state) {
+    case UNINIT:
+      try {
+        init();
+      } catch (Throwable e) {
+        LOG.error("caught exception initializing " + this, e);
+        if (skipBrokenEdits) {
+          return null;
+        }
+        Throwables.propagateIfPossible(e, IOException.class);
+      }
+      Preconditions.checkState(state != State.UNINIT);
+      return nextOpImpl(skipBrokenEdits);
+    case OPEN:
+      op = reader.readOp(skipBrokenEdits);
+      if ((op != null) && (op.hasTransactionId())) {
+        long txId = op.getTransactionId();
+        if ((txId >= lastTxId) &&
+            (lastTxId != HdfsConstants.INVALID_TXID)) {
+          //
+          // Sometimes, the NameNode crashes while it's writing to the
+          // edit log.  In that case, you can end up with an unfinalized edit log
+          // which has some garbage at the end.
+          // JournalManager#recoverUnfinalizedSegments will finalize these
+          // unfinished edit logs, giving them a defined final transaction 
+          // ID.  Then they will be renamed, so that any subsequent
+          // readers will have this information.
+          //
+          // Since there may be garbage at the end of these "cleaned up"
+          // logs, we want to be sure to skip it here if we've read everything
+          // we were supposed to read out of the stream.
+          // So we force an EOF on all subsequent reads.
+          //
+          long skipAmt = file.length() - tracker.getPos();
+          if (skipAmt > 0) {
+            LOG.warn("skipping " + skipAmt + " bytes at the end " +
+              "of edit log  '" + getName() + "': reached txid " + txId +
+              " out of " + lastTxId);
+            tracker.skip(skipAmt);
+          }
+        }
+      }
+      break;
+      case CLOSED:
+        break; // return null
+    }
+    return op;
+  }
+
   @Override
   protected FSEditLogOp nextOp() throws IOException {
-    return reader.readOp(false);
+    return nextOpImpl(false);
   }
-  
+
   @Override
   protected FSEditLogOp nextValidOp() {
     try {
-      return reader.readOp(true);
-    } catch (IOException e) {
+      return nextOpImpl(true);
+    } catch (Throwable e) {
+      LOG.error("nextValidOp: got exception while reading " + this, e);
       return null;
     }
   }
 
   @Override
   public int getVersion() throws IOException {
+    if (state == State.UNINIT) {
+      init();
+    }
     return logVersion;
   }
 
   @Override
   public long getPosition() {
-    return tracker.getPos();
+    if (state == State.OPEN) {
+      return tracker.getPos();
+    } else {
+      return 0;
+    }
   }
 
   @Override
   public void close() throws IOException {
-    fStream.close();
+    if (state == State.OPEN) {
+      dataIn.close();
+    }
+    state = State.CLOSED;
   }
 
   @Override
@@ -153,12 +236,12 @@ public class EditLogFileInputStream exte
     EditLogFileInputStream in;
     try {
       in = new EditLogFileInputStream(file);
-    } catch (LogHeaderCorruptException corrupt) {
+      in.getVersion(); // causes us to read the header
+    } catch (LogHeaderCorruptException e) {
       // If the header is malformed or the wrong value, this indicates a corruption
-      FSImage.LOG.warn("Log at " + file + " has no valid header",
-          corrupt);
+      LOG.warn("Log file " + file + " has no valid header", e);
       return new FSEditLogLoader.EditLogValidation(0,
-          HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true);
+          HdfsConstants.INVALID_TXID, true);
     }
     
     try {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Wed Jun  6 00:17:38 2012
@@ -41,12 +41,13 @@ import com.google.common.annotations.Vis
 @InterfaceAudience.Private
 public class EditLogFileOutputStream extends EditLogOutputStream {
   private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
+  public static final int PREALLOCATION_LENGTH = 1024 * 1024;
 
   private File file;
   private FileOutputStream fp; // file stream for storing edit logs
   private FileChannel fc; // channel of the file stream for sync
   private EditsDoubleBuffer doubleBuf;
-  static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB
+  static ByteBuffer fill = ByteBuffer.allocateDirect(PREALLOCATION_LENGTH);
 
   static {
     fill.position(0);

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java Wed Jun  6 00:17:38 2012
@@ -45,12 +45,12 @@ public abstract class EditLogInputStream
   /** 
    * @return the first transaction which will be found in this stream
    */
-  public abstract long getFirstTxId() throws IOException;
+  public abstract long getFirstTxId();
   
   /** 
    * @return the last transaction which will be found in this stream
    */
-  public abstract long getLastTxId() throws IOException;
+  public abstract long getLastTxId();
 
 
   /**
@@ -73,14 +73,14 @@ public abstract class EditLogInputStream
     }
     return nextOp();
   }
-
+  
   /** 
    * Position the stream so that a valid operation can be read from it with
    * readOp().
    * 
    * This method can be used to skip over corrupted sections of edit logs.
    */
-  public void resync() throws IOException {
+  public void resync() {
     if (cachedOp != null) {
       return;
     }
@@ -109,7 +109,7 @@ public abstract class EditLogInputStream
     // error recovery will want to override this.
     try {
       return nextOp();
-    } catch (IOException e) {
+    } catch (Throwable e) {
       return null;
     }
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Jun  6 00:17:38 2012
@@ -25,6 +25,7 @@ import java.lang.reflect.Constructor;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -232,6 +233,10 @@ public class FSEditLog  {
         DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
 
     journalSet = new JournalSet(minimumRedundantJournals);
+    // set runtime so we can test starting with a faulty or unavailable
+    // shared directory
+    this.journalSet.setRuntimeForTesting(runtime);
+
     for (URI u : dirs) {
       boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
           .contains(u);
@@ -269,13 +274,14 @@ public class FSEditLog  {
     long segmentTxId = getLastWrittenTxId() + 1;
     // Safety check: we should never start a segment if there are
     // newer txids readable.
-    EditLogInputStream s = journalSet.getInputStream(segmentTxId, true);
-    try {
-      Preconditions.checkState(s == null,
-          "Cannot start writing at txid %s when there is a stream " +
-          "available for read: %s", segmentTxId, s);
-    } finally {
-      IOUtils.closeStream(s);
+    List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
+    journalSet.selectInputStreams(streams, segmentTxId, true);
+    if (!streams.isEmpty()) {
+      String error = String.format("Cannot start writing at txid %s " +
+        "when there is a stream available for read: %s",
+        segmentTxId, streams.get(0));
+      IOUtils.cleanup(LOG, streams.toArray(new EditLogInputStream[0]));
+      throw new IllegalStateException(error);
     }
     
     startLogSegmentAndWriteHeaderTxn(segmentTxId);
@@ -895,7 +901,7 @@ public class FSEditLog  {
    * Used only by unit tests.
    */
   @VisibleForTesting
-  synchronized void setRuntimeForTesting(Runtime runtime) {
+  synchronized public void setRuntimeForTesting(Runtime runtime) {
     this.runtime = runtime;
     this.journalSet.setRuntimeForTesting(runtime);
   }
@@ -1199,10 +1205,10 @@ public class FSEditLog  {
       // All journals have failed, it is handled in logSync.
     }
   }
-  
-  Collection<EditLogInputStream> selectInputStreams(long fromTxId,
-      long toAtLeastTxId) throws IOException {
-    return selectInputStreams(fromTxId, toAtLeastTxId, true);
+
+  public Collection<EditLogInputStream> selectInputStreams(
+      long fromTxId, long toAtLeastTxId) throws IOException {
+    return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
   }
 
   /**
@@ -1212,25 +1218,71 @@ public class FSEditLog  {
    * @param toAtLeast the selected streams must contain this transaction
    * @param inProgessOk set to true if in-progress streams are OK
    */
-  public synchronized Collection<EditLogInputStream> selectInputStreams(long fromTxId,
-      long toAtLeastTxId, boolean inProgressOk) throws IOException {
+  public synchronized Collection<EditLogInputStream> selectInputStreams(
+      long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
+      boolean inProgressOk) throws IOException {
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
-    EditLogInputStream stream = journalSet.getInputStream(fromTxId, inProgressOk);
-    while (stream != null) {
-      streams.add(stream);
-      // We're now looking for a higher range, so reset the fromTxId
-      fromTxId = stream.getLastTxId() + 1;
-      stream = journalSet.getInputStream(fromTxId, inProgressOk);
+    journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+
+    try {
+      checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
+    } catch (IOException e) {
+      if (recovery != null) {
+        // If recovery mode is enabled, continue loading even if we know we
+        // can't load up to toAtLeastTxId.
+        LOG.error(e);
+      } else {
+        closeAllStreams(streams);
+        throw e;
+      }
     }
-    
-    if (fromTxId <= toAtLeastTxId) {
-      closeAllStreams(streams);
-      throw new IOException(String.format("Gap in transactions. Expected to "
-          + "be able to read up until at least txid %d but unable to find any "
-          + "edit logs containing txid %d", toAtLeastTxId, fromTxId));
+    // This code will go away as soon as RedundantEditLogInputStream is
+    // introduced. (HDFS-3049)
+    try {
+      if (!streams.isEmpty()) {
+        streams.get(0).skipUntil(fromTxId);
+      }
+    } catch (IOException e) {
+      // We don't want to throw an exception from here, because that would make
+      // recovery impossible even if the user requested it.  An exception will
+      // be thrown later, when we don't read the starting txid we expect.
+      LOG.error("error skipping until transaction " + fromTxId, e);
     }
     return streams;
   }
+  
+  /**
+   * Check for gaps in the edit log input stream list.
+   * Note: we're assuming that the list is sorted and that txid ranges don't
+   * overlap.  This could be done better and with more generality with an
+   * interval tree.
+   */
+  private void checkForGaps(List<EditLogInputStream> streams, long fromTxId,
+      long toAtLeastTxId, boolean inProgressOk) throws IOException {
+    Iterator<EditLogInputStream> iter = streams.iterator();
+    long txId = fromTxId;
+    while (true) {
+      if (txId > toAtLeastTxId) return;
+      if (!iter.hasNext()) break;
+      EditLogInputStream elis = iter.next();
+      if (elis.getFirstTxId() > txId) break;
+      long next = elis.getLastTxId();
+      if (next == HdfsConstants.INVALID_TXID) {
+        if (!inProgressOk) {
+          throw new RuntimeException("inProgressOk = false, but " +
+              "selectInputStreams returned an in-progress edit " +
+              "log input stream (" + elis + ")");
+        }
+        // We don't know where the in-progress stream ends.
+        // It could certainly go all the way up to toAtLeastTxId.
+        return;
+      }
+      txId = next + 1;
+    }
+    throw new IOException(String.format("Gap in transactions. Expected to "
+        + "be able to read up until at least txid %d but unable to find any "
+        + "edit logs containing txid %d", toAtLeastTxId, txId));
+  }
 
   /** 
    * Close all the streams in a collection

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Jun  6 00:17:38 2012
@@ -85,12 +85,10 @@ public class FSEditLogLoader {
    */
   long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
       MetaRecoveryContext recovery) throws IOException {
-    int logVersion = edits.getVersion();
-
     fsNamesys.writeLock();
     try {
       long startTime = now();
-      long numEdits = loadEditRecords(logVersion, edits, false, 
+      long numEdits = loadEditRecords(edits, false, 
                                  expectedStartingTxId, recovery);
       FSImage.LOG.info("Edits file " + edits.getName() 
           + " of size " + edits.length() + " edits # " + numEdits 
@@ -102,7 +100,7 @@ public class FSEditLogLoader {
     }
   }
 
-  long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
+  long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
                       long expectedStartingTxId, MetaRecoveryContext recovery)
       throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
@@ -141,10 +139,10 @@ public class FSEditLogLoader {
             }
           } catch (Throwable e) {
             // Handle a problem with our input
-            check203UpgradeFailure(logVersion, e);
+            check203UpgradeFailure(in.getVersion(), e);
             String errorMessage =
               formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
-            FSImage.LOG.error(errorMessage);
+            FSImage.LOG.error(errorMessage, e);
             if (recovery == null) {
                // We will only try to skip over problematic opcodes when in
                // recovery mode.
@@ -158,7 +156,7 @@ public class FSEditLogLoader {
           }
           recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
             in.getPosition();
-          if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+          if (op.hasTransactionId()) {
             if (op.getTransactionId() > expectedTxId) { 
               MetaRecoveryContext.editLogLoaderPrompt("There appears " +
                   "to be a gap in the edit log.  We expected txid " +
@@ -175,7 +173,7 @@ public class FSEditLogLoader {
             }
           }
           try {
-            applyEditLogOp(op, fsDir, logVersion);
+            applyEditLogOp(op, fsDir, in.getVersion());
           } catch (Throwable e) {
             LOG.error("Encountered exception on operation " + op, e);
             MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
@@ -192,7 +190,7 @@ public class FSEditLogLoader {
             expectedTxId = lastAppliedTxId = expectedStartingTxId;
           }
           // log progress
-          if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+          if (op.hasTransactionId()) {
             long now = now();
             if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
               int percent = Math.round((float)lastAppliedTxId / numTxns * 100);
@@ -647,112 +645,119 @@ public class FSEditLogLoader {
   }
   
   /**
-   * Return the number of valid transactions in the stream. If the stream is
-   * truncated during the header, returns a value indicating that there are
-   * 0 valid transactions. This reads through the stream but does not close
-   * it.
+   * Find the last valid transaction ID in the stream.
+   * If there are invalid or corrupt transactions in the middle of the stream,
+   * validateEditLog will skip over them.
+   * This reads through the stream but does not close it.
+   *
    * @throws IOException if the stream cannot be read due to an IO error (eg
    *                     if the log does not exist)
    */
   static EditLogValidation validateEditLog(EditLogInputStream in) {
     long lastPos = 0;
-    long firstTxId = HdfsConstants.INVALID_TXID;
     long lastTxId = HdfsConstants.INVALID_TXID;
     long numValid = 0;
-    try {
-      FSEditLogOp op = null;
-      while (true) {
-        lastPos = in.getPosition();
+    FSEditLogOp op = null;
+    while (true) {
+      lastPos = in.getPosition();
+      try {
         if ((op = in.readOp()) == null) {
           break;
         }
-        if (firstTxId == HdfsConstants.INVALID_TXID) {
-          firstTxId = op.getTransactionId();
-        }
-        if (lastTxId == HdfsConstants.INVALID_TXID
-            || op.getTransactionId() == lastTxId + 1) {
-          lastTxId = op.getTransactionId();
-        } else {
-          FSImage.LOG.error("Out of order txid found. Found " +
-            op.getTransactionId() + ", expected " + (lastTxId + 1));
-          break;
-        }
-        numValid++;
+      } catch (Throwable t) {
+        FSImage.LOG.warn("Caught exception after reading " + numValid +
+            " ops from " + in + " while determining its valid length." +
+            "Position was " + lastPos, t);
+        break;
+      }
+      if (lastTxId == HdfsConstants.INVALID_TXID
+          || op.getTransactionId() > lastTxId) {
+        lastTxId = op.getTransactionId();
       }
-    } catch (Throwable t) {
-      // Catch Throwable and not just IOE, since bad edits may generate
-      // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
-      FSImage.LOG.debug("Caught exception after reading " + numValid +
-          " ops from " + in + " while determining its valid length.", t);
+      numValid++;
     }
-    return new EditLogValidation(lastPos, firstTxId, lastTxId, false);
+    return new EditLogValidation(lastPos, lastTxId, false);
   }
-  
+
   static class EditLogValidation {
     private final long validLength;
-    private final long startTxId;
     private final long endTxId;
-    private final boolean corruptionDetected;
-     
-    EditLogValidation(long validLength, long startTxId, long endTxId,
-        boolean corruptionDetected) {
+    private final boolean hasCorruptHeader;
+
+    EditLogValidation(long validLength, long endTxId,
+        boolean hasCorruptHeader) {
       this.validLength = validLength;
-      this.startTxId = startTxId;
       this.endTxId = endTxId;
-      this.corruptionDetected = corruptionDetected;
+      this.hasCorruptHeader = hasCorruptHeader;
     }
-    
+
     long getValidLength() { return validLength; }
-    
-    long getStartTxId() { return startTxId; }
-    
+
     long getEndTxId() { return endTxId; }
-    
-    long getNumTransactions() { 
-      if (endTxId == HdfsConstants.INVALID_TXID
-          || startTxId == HdfsConstants.INVALID_TXID) {
-        return 0;
-      }
-      return (endTxId - startTxId) + 1;
-    }
-    
-    boolean hasCorruptHeader() { return corruptionDetected; }
+
+    boolean hasCorruptHeader() { return hasCorruptHeader; }
   }
 
   /**
    * Stream wrapper that keeps track of the current stream position.
+   * 
+   * This stream also allows us to set a limit on how many bytes we can read
+   * without getting an exception.
    */
-  public static class PositionTrackingInputStream extends FilterInputStream {
+  public static class PositionTrackingInputStream extends FilterInputStream
+      implements StreamLimiter {
     private long curPos = 0;
     private long markPos = -1;
+    private long limitPos = Long.MAX_VALUE;
 
     public PositionTrackingInputStream(InputStream is) {
       super(is);
     }
 
+    private void checkLimit(long amt) throws IOException {
+      long extra = (curPos + amt) - limitPos;
+      if (extra > 0) {
+        throw new IOException("Tried to read " + amt + " byte(s) past " +
+            "the limit at offset " + limitPos);
+      }
+    }
+    
+    @Override
     public int read() throws IOException {
+      checkLimit(1);
       int ret = super.read();
       if (ret != -1) curPos++;
       return ret;
     }
 
+    @Override
     public int read(byte[] data) throws IOException {
+      checkLimit(data.length);
       int ret = super.read(data);
       if (ret > 0) curPos += ret;
       return ret;
     }
 
+    @Override
     public int read(byte[] data, int offset, int length) throws IOException {
+      checkLimit(length);
       int ret = super.read(data, offset, length);
       if (ret > 0) curPos += ret;
       return ret;
     }
 
+    @Override
+    public void setLimit(long limit) {
+      limitPos = curPos + limit;
+    }
+
+    @Override
     public void mark(int limit) {
       super.mark(limit);
       markPos = curPos;
     }
 
+    @Override
     public void reset() throws IOException {
       if (markPos == -1) {
         throw new IOException("Not marked!");
@@ -765,6 +770,18 @@ public class FSEditLogLoader {
     public long getPos() {
       return curPos;
     }
+    
+    @Override
+    public long skip(long amt) throws IOException {
+      long extra = (curPos + amt) - limitPos;
+      if (extra > 0) {
+        throw new IOException("Tried to skip " + extra + " bytes past " +
+            "the limit at offset " + limitPos);
+      }
+      long ret = super.skip(amt);
+      curPos += ret;
+      return ret;
+    }
   }
 
   public long getLastAppliedTxId() {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Wed Jun  6 00:17:38 2012
@@ -75,6 +75,10 @@ import java.io.EOFException;
 public abstract class FSEditLogOp {
   public final FSEditLogOpCodes opCode;
   long txid;
+  /**
+   * Opcode size is limited to 1.5 megabytes
+   */
+  public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2;
 
 
   @SuppressWarnings("deprecation")
@@ -2228,6 +2232,7 @@ public abstract class FSEditLogOp {
    */
   public static class Reader {
     private final DataInputStream in;
+    private final StreamLimiter limiter;
     private final int logVersion;
     private final Checksum checksum;
     private final OpInstanceCache cache;
@@ -2238,7 +2243,7 @@ public abstract class FSEditLogOp {
      * @param logVersion The version of the data coming from the stream.
      */
     @SuppressWarnings("deprecation")
-    public Reader(DataInputStream in, int logVersion) {
+    public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
       this.logVersion = logVersion;
       if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
         this.checksum = new PureJavaCrc32();
@@ -2252,6 +2257,7 @@ public abstract class FSEditLogOp {
       } else {
         this.in = in;
       }
+      this.limiter = limiter;
       this.cache = new OpInstanceCache();
     }
 
@@ -2263,30 +2269,76 @@ public abstract class FSEditLogOp {
      * 
      * @param skipBrokenEdits    If true, attempt to skip over damaged parts of
      * the input stream, rather than throwing an IOException
-     * @return the operation read from the stream, or null at the end of the file
-     * @throws IOException on error.
+     * @return the operation read from the stream, or null at the end of the 
+     *         file
+     * @throws IOException on error.  This function should only throw an
+     *         exception when skipBrokenEdits is false.
      */
     public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
-      FSEditLogOp op = null;
       while (true) {
         try {
-          in.mark(in.available());
-          try {
-            op = decodeOp();
-          } finally {
-            // If we encountered an exception or an end-of-file condition,
-            // do not advance the input stream.
-            if (op == null) {
-              in.reset();
-            }
+          limiter.setLimit(MAX_OP_SIZE);
+          in.mark(MAX_OP_SIZE);
+          return decodeOp();
+        } catch (GarbageAfterTerminatorException e) {
+          in.reset();
+          if (!skipBrokenEdits) {
+            throw e;
+          }
+          // If we saw a terminator opcode followed by a long region of 0x00 or
+          // 0xff, we want to skip over that region, because there's nothing
+          // interesting there.
+          long numSkip = e.getNumAfterTerminator();
+          if (in.skip(numSkip) < numSkip) {
+            FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
+              "garbage after an OP_INVALID.  Unexpected early EOF.");
+            return null;
           }
-          return op;
         } catch (IOException e) {
+          in.reset();
           if (!skipBrokenEdits) {
             throw e;
           }
-          if (in.skip(1) < 1) {
-            return null;
+        } catch (RuntimeException e) {
+          // FSEditLogOp#decodeOp is not supposed to throw RuntimeException.
+          // However, we handle it here for recovery mode, just to be more
+          // robust.
+          in.reset();
+          if (!skipBrokenEdits) {
+            throw e;
+          }
+        } catch (Throwable e) {
+          in.reset();
+          if (!skipBrokenEdits) {
+            throw new IOException("got unexpected exception " +
+                e.getMessage(), e);
+          }
+        }
+        // Move ahead one byte and re-try the decode process.
+        if (in.skip(1) < 1) {
+          return null;
+        }
+      }
+    }
+
+    private void verifyTerminator() throws IOException {
+      long off = 0;
+      /** The end of the edit log should contain only 0x00 or 0xff bytes.
+       * If it contains other bytes, the log itself may be corrupt.
+       * It is important to check this; if we don't, a stray OP_INVALID byte 
+       * could make us stop reading the edit log halfway through, and we'd never
+       * know that we had lost data.
+       */
+      byte[] buf = new byte[4096];
+      while (true) {
+        int numRead = in.read(buf);
+        if (numRead == -1) {
+          return;
+        }
+        for (int i = 0; i < numRead; i++, off++) {
+          if ((buf[i] != (byte)0) && (buf[i] != (byte)-1)) {
+            throw new GarbageAfterTerminatorException("Read garbage after " +
+            		"the terminator!", off);
           }
         }
       }
@@ -2306,8 +2358,10 @@ public abstract class FSEditLogOp {
       }
 
       FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
-      if (opCode == OP_INVALID)
+      if (opCode == OP_INVALID) {
+        verifyTerminator();
         return null;
+      }
 
       FSEditLogOp op = cache.get(opCode);
       if (op == null) {
@@ -2477,4 +2531,35 @@ public abstract class FSEditLogOp {
     short mode = Short.valueOf(st.getValue("MODE"));
     return new PermissionStatus(username, groupname, new FsPermission(mode));
   }
-		}
+
+  /**
+   * Exception indicating that we found an OP_INVALID followed by some 
+   * garbage.  An OP_INVALID should signify the end of the file... if there 
+   * is additional content after that, then the edit log is corrupt. 
+   */
+  static class GarbageAfterTerminatorException extends IOException {
+    private static final long serialVersionUID = 1L;
+    private final long numAfterTerminator;
+
+    public GarbageAfterTerminatorException(String str,
+        long numAfterTerminator) {
+      super(str);
+      this.numAfterTerminator = numAfterTerminator;
+    }
+
+    /**
+     * Get the number of bytes after the terminator at which the garbage
+     * appeared.
+     *
+     * So if you had an OP_INVALID followed immediately by another valid opcode,
+     * this would be 0.
+     * If you had an OP_INVALID followed by some padding bytes, followed by a
+     * stray byte at the end, this would be the number of padding bytes.
+     * 
+     * @return numAfterTerminator
+     */
+    public long getNumAfterTerminator() {
+      return numAfterTerminator;
+    }
+  }
+}

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Jun  6 00:17:38 2012
@@ -54,12 +54,14 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -88,9 +90,6 @@ public class FSImage implements Closeabl
 
   private final NNStorageRetentionManager archivalManager;
 
-  private SaveNamespaceContext curSaveNamespaceContext = null; 
-
-
   /**
    * Construct an FSImage
    * @param conf Configuration
@@ -536,6 +535,11 @@ public class FSImage implements Closeabl
     return editLog;
   }
 
+  @VisibleForTesting
+  void setEditLogForTesting(FSEditLog newLog) {
+    editLog = newLog;
+  }
+
   void openEditLogForWrite() throws IOException {
     assert editLog != null : "editLog must be initialized";
     editLog.openForWrite();
@@ -555,7 +559,7 @@ public class FSImage implements Closeabl
 
   /**
    * Choose latest image from one of the directories,
-   * load it and merge with the edits from that directory.
+   * load it and merge with the edits.
    * 
    * Saving and loading fsimage should never trigger symlink resolution. 
    * The paths that are persisted do not have *intermediate* symlinks 
@@ -591,7 +595,7 @@ public class FSImage implements Closeabl
       // OK to not be able to read all of edits right now.
       long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
       editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
-          toAtLeastTxId, false);
+          toAtLeastTxId, recovery, false);
     } else {
       editStreams = FSImagePreTransactionalStorageInspector
         .getEditLogStreams(storage);
@@ -599,7 +603,10 @@ public class FSImage implements Closeabl
  
     LOG.debug("Planning to load image :\n" + imageFile);
     for (EditLogInputStream l : editStreams) {
-      LOG.debug("\t Planning to load edit stream: " + l);
+      LOG.debug("Planning to load edit log stream: " + l);
+    }
+    if (!editStreams.iterator().hasNext()) {
+      LOG.info("No edit log streams selected.");
     }
     
     try {
@@ -798,17 +805,28 @@ public class FSImage implements Closeabl
         try {
           thread.join();
         } catch (InterruptedException iex) {
-          LOG.error("Caught exception while waiting for thread " +
+          LOG.error("Caught interrupted exception while waiting for thread " +
                     thread.getName() + " to finish. Retrying join");
         }        
       }
     }
   }
+  
+  /**
+   * @see #saveNamespace(FSNamesystem, Canceler)
+   */
+  public synchronized void saveNamespace(FSNamesystem source)
+      throws IOException {
+    saveNamespace(source, null);
+  }
+  
   /**
    * Save the contents of the FS image to a new image file in each of the
    * current storage directories.
+   * @param canceler 
    */
-  public synchronized void saveNamespace(FSNamesystem source) throws IOException {
+  public synchronized void saveNamespace(FSNamesystem source,
+      Canceler canceler) throws IOException {
     assert editLog != null : "editLog must be initialized";
     storage.attemptRestoreRemovedStorage();
 
@@ -819,7 +837,7 @@ public class FSImage implements Closeabl
     }
     long imageTxId = getLastAppliedOrWrittenTxId();
     try {
-      saveFSImageInAllDirs(source, imageTxId);
+      saveFSImageInAllDirs(source, imageTxId, canceler);
       storage.writeAll();
     } finally {
       if (editLogWasOpen) {
@@ -831,27 +849,27 @@ public class FSImage implements Closeabl
         storage.writeTransactionIdFileToStorage(imageTxId + 1);
       }
     }
-    
-  }
-  
-  public void cancelSaveNamespace(String reason)
-      throws InterruptedException {
-    SaveNamespaceContext ctx = curSaveNamespaceContext;
-    if (ctx != null) {
-      ctx.cancel(reason); // waits until complete
-    }
   }
 
-  
+  /**
+   * @see #saveFSImageInAllDirs(FSNamesystem, long, Canceler)
+   */
   protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
+      throws IOException {
+    saveFSImageInAllDirs(source, txid, null);
+  }
+
+  protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
+      Canceler canceler)
       throws IOException {    
     if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
       throw new IOException("No image directories available!");
     }
-    
+    if (canceler == null) {
+      canceler = new Canceler();
+    }
     SaveNamespaceContext ctx = new SaveNamespaceContext(
-        source, txid);
-    curSaveNamespaceContext = ctx;
+        source, txid, canceler);
     
     try {
       List<Thread> saveThreads = new ArrayList<Thread>();
@@ -872,7 +890,7 @@ public class FSImage implements Closeabl
         throw new IOException(
           "Failed to save in any storage directories while saving namespace.");
       }
-      if (ctx.isCancelled()) {
+      if (canceler.isCancelled()) {
         deleteCancelledCheckpoint(txid);
         ctx.checkCancelled(); // throws
         assert false : "should have thrown above!";

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Jun  6 00:17:38 2012
@@ -540,7 +540,6 @@ class FSImageFormat {
     private void saveImage(ByteBuffer currentDirName,
                                   INodeDirectory current,
                                   DataOutputStream out) throws IOException {
-      context.checkCancelled();
       List<INode> children = current.getChildrenRaw();
       if (children == null || children.isEmpty())
         return;
@@ -554,9 +553,13 @@ class FSImageFormat {
         out.write(currentDirName.array(), 0, prefixLen);
       }
       out.writeInt(children.size());
+      int i = 0;
       for(INode child : children) {
         // print all children first
         FSImageSerialization.saveINode2Image(child, out);
+        if (i++ % 50 == 0) {
+          context.checkCancelled();
+        }
       }
       for(INode child : children) {
         if(!child.isDirectory())