You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/05/14 08:59:47 UTC

svn commit: r656122 [1/2] - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/test/org/apache/hadoop/dfs/ src/webapps/datanode/

Author: dhruba
Date: Tue May 13 23:59:46 2008
New Revision: 656122

URL: http://svn.apache.org/viewvc?rev=656122&view=rev
Log:
HADOOP-2656.  The Block object has a generation stamp inside it.
Existing blocks get a generation stamp of 0. This is needed to support
appends. (dhruba)


Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NamenodeFsck.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UpgradeManager.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/ObjectWritable.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBalancer.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestBlockReplacement.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFsck.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestPendingReplication.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplication.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestSimulatedFSDataset.java
    hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp
    hadoop/core/trunk/src/webapps/datanode/browseDirectory.jsp
    hadoop/core/trunk/src/webapps/datanode/tail.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 13 23:59:46 2008
@@ -47,6 +47,10 @@
     HADOOP-3329.  DatanodeDescriptor objects should not be stored in the
     fsimage. (dhruba)
 
+    HADOOP-2656.  The Block object has a generation stamp inside it.
+    Existing blocks get a generation stamp of 0. This is needed to support
+    appends. (dhruba)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Balancer.java Tue May 13 23:59:46 2008
@@ -344,6 +344,7 @@
       out.writeShort(FSConstants.DATA_TRANSFER_VERSION);
       out.writeByte(FSConstants.OP_COPY_BLOCK);
       out.writeLong(block.getBlock().getBlockId());
+      out.writeLong(block.getBlock().getGenerationStamp());
       Text.writeString(out, source.getStorageID());
       target.write(out);
       out.flush();

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/Block.java Tue May 13 23:59:46 2008
@@ -35,9 +35,13 @@
        });
   }
 
+  // generation stamp of blocks that pre-date the introduction of
+  // a generation stamp.
+  static final long GRANDFATHER_GENERATION_STAMP = 0;
+
   /**
    */
-  public static boolean isBlockFilename(File f) {
+  static boolean isBlockFilename(File f) {
     String name = f.getName();
     if ( name.startsWith( "blk_" ) && 
         name.indexOf( '.' ) < 0 ) {
@@ -49,19 +53,22 @@
 
   long blkid;
   long len;
+  long generationStamp;
 
   /**
    */
   public Block() {
     this.blkid = 0;
     this.len = 0;
+    this.generationStamp = 0;
   }
 
   /**
    */
-  public Block(final long blkid, final long len) {
+  public Block(final long blkid, final long len, final long generationStamp) {
     this.blkid = blkid;
     this.len = len;
+    this.generationStamp = generationStamp;
   }
 
   /**
@@ -69,21 +76,24 @@
   public Block(Block blk) {
     this.blkid = blk.blkid;
     this.len = blk.len;
+    this.generationStamp = blk.generationStamp;
   }
 
   /**
    * Find the blockid from the given filename
    */
-  public Block(File f, long len) {
+  public Block(File f, long len, long genstamp) {
     String name = f.getName();
     name = name.substring("blk_".length());
     this.blkid = Long.parseLong(name);
     this.len = len;
+    this.generationStamp = genstamp;
   }
 
-  public void set(long blkid, long len) {
+  public void set(long blkid, long len, long genStamp) {
     this.blkid = blkid;
     this.len = len;
+    this.generationStamp = genStamp;
   }
   /**
    */
@@ -106,10 +116,14 @@
     this.len = len;
   }
 
+  long getGenerationStamp() {
+    return generationStamp;
+  }
+
   /**
    */
   public String toString() {
-    return getBlockName();
+    return getBlockName() + "_" + getGenerationStamp();
   }
 
   /////////////////////////////////////
@@ -118,11 +132,13 @@
   public void write(DataOutput out) throws IOException {
     out.writeLong(blkid);
     out.writeLong(len);
+    out.writeLong(generationStamp);
   }
 
   public void readFields(DataInput in) throws IOException {
     this.blkid = in.readLong();
     this.len = in.readLong();
+    this.generationStamp = in.readLong();
     if (len < 0) {
       throw new IOException("Unexpected block size: " + len);
     }
@@ -136,7 +152,13 @@
     if (blkid < b.blkid) {
       return -1;
     } else if (blkid == b.blkid) {
-      return 0;
+      if (generationStamp < b.generationStamp) {
+        return -1;
+      } else if (generationStamp == b.generationStamp) {
+        return 0;
+      } else {
+        return 1;
+      }
     } else {
       return 1;
     }
@@ -145,7 +167,8 @@
     if (!(o instanceof Block)) {
       return false;
     }
-    return blkid == ((Block)o).blkid;
+    return blkid == ((Block)o).blkid &&
+           generationStamp == ((Block)o).generationStamp;
   }
     
   public int hashCode() {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockListAsLongs.java Tue May 13 23:59:46 2008
@@ -26,10 +26,10 @@
  */
 class BlockListAsLongs {
   /**
-   * A block as 2 longs
-   *   block-id and block length
+   * A block as 3 longs
+   *   block-id and block length and generation stamp
    */
-  private static final int LONGS_PER_BLOCK = 2;
+  private static final int LONGS_PER_BLOCK = 3;
   
   private static int index2BlockId(int index) {
     return index*LONGS_PER_BLOCK;
@@ -37,6 +37,9 @@
   private static int index2BlockLen(int index) {
     return (index*LONGS_PER_BLOCK) + 1;
   }
+  private static int index2BlockGenStamp(int index) {
+    return (index*LONGS_PER_BLOCK) + 2;
+  }
   
   private long[] blockList;
   
@@ -101,6 +104,15 @@
   long getBlockLen(final int index)  {
     return blockList[index2BlockLen(index)];
   }
+
+  /**
+   * The generation stamp of the indexTh block
+   * @param index - the block whose block-len is desired
+   * @return - the generation stamp
+   */
+  long getBlockGenStamp(final int index)  {
+    return blockList[index2BlockGenStamp(index)];
+  }
   
   /**
    * Set the indexTh block
@@ -110,5 +122,6 @@
   void setBlock(final int index, final Block b) {
     blockList[index2BlockId(index)] = b.getBlockId();
     blockList[index2BlockLen(index)] = b.getNumBytes();
+    blockList[index2BlockGenStamp(index)] = b.getGenerationStamp();
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java Tue May 13 23:59:46 2008
@@ -44,8 +44,6 @@
     return info;
   }
 
-  //TODO: remove generationStamp if it is defined in Block
-  private long generationStamp;
   private long lastScanTime;
 
   public BlockMetaDataInfo() {}
@@ -57,14 +55,12 @@
   /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    out.writeLong(generationStamp);
     out.writeLong(lastScanTime);
   }
 
   /** {@inheritDoc} */
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    generationStamp = in.readLong();
     lastScanTime = in.readLong();
   }
-}
\ No newline at end of file
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java?rev=656122&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetadataHeader.java Tue May 13 23:59:46 2008
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.IOUtils;
+
+
+/**
+ * BlockMetadataHeader manages metadata for data blocks on Datanodes.
+ * This is not related to the Block related functionality in Namenode.
+ * The biggest part of data block metadata is CRC for the block.
+ */
+class BlockMetadataHeader {
+
+  static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
+  
+  /**
+   * Header includes everything except the checksum(s) themselves.
+   * Version is two bytes. Following it is the DataChecksum
+   * that occupies 5 bytes. 
+   */
+  private short version;
+  private DataChecksum checksum = null;
+    
+  BlockMetadataHeader(short version, DataChecksum checksum) {
+    this.checksum = checksum;
+    this.version = version;
+  }
+    
+  short getVersion() {
+    return version;
+  }
+
+  DataChecksum getChecksum() {
+    return checksum;
+  }
+
+ 
+  /**
+   * This reads all the fields till the beginning of checksum.
+   * @param in 
+   * @return Metadata Header
+   * @throws IOException
+   */
+  static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
+    return readHeader(in.readShort(), in);
+  }
+  
+  /**
+   * Reads header at the top of metadata file and returns the header.
+   * 
+   * @param dataset
+   * @param block
+   * @return
+   * @throws IOException
+   */
+  static BlockMetadataHeader readHeader(File file) throws IOException {
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(new BufferedInputStream(
+                               new FileInputStream(file)));
+      return readHeader(in);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+  
+  // Version is already read.
+  private static BlockMetadataHeader readHeader(short version, DataInputStream in) 
+                                   throws IOException {
+    DataChecksum checksum = DataChecksum.newDataChecksum(in);
+    return new BlockMetadataHeader(version, checksum);
+  }
+  
+  /**
+   * This writes all the fields till the beginning of checksum.
+   * @param out DataOutputStream
+   * @param header 
+   * @return 
+   * @throws IOException
+   */
+  private static void writeHeader(DataOutputStream out, 
+                                  BlockMetadataHeader header) 
+                                  throws IOException {
+    out.writeShort(header.getVersion());
+    header.getChecksum().writeHeader(out);
+  }
+  
+  /**
+   * Writes all the fields till the beginning of checksum.
+   * @param out
+   * @param checksum
+   * @throws IOException
+   */
+  static void writeHeader(DataOutputStream out, DataChecksum checksum)
+                         throws IOException {
+    writeHeader(out, new BlockMetadataHeader(METADATA_VERSION, checksum));
+  }
+
+  /**
+   * Returns the size of the header
+   */
+  static int getHeaderSize() {
+    return Short.SIZE/Byte.SIZE + DataChecksum.getChecksumHeaderSize();
+  }
+}
+

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue May 13 23:59:46 2008
@@ -37,9 +37,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 32: add corrupt field to LocatedBlock
+   * 33 : Block generation stamp stored in Block
    */
-  public static final long versionID = 32L;
+  public static final long versionID = 33L;
   
   ///////////////////////////////////////
   // File contents

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue May 13 23:59:46 2008
@@ -985,13 +985,14 @@
     }
 
     static BlockReader newBlockReader(Socket sock, String file, long blockId, 
-        long startOffset, long len, int bufferSize) throws IOException {
-      return newBlockReader(sock, file, blockId, startOffset, len, bufferSize,
+        long genStamp, long startOffset, long len, int bufferSize) throws IOException {
+      return newBlockReader(sock, file, blockId, genStamp, startOffset, len, bufferSize,
           true);
     }
 
     /** Java Doc required */
     static BlockReader newBlockReader( Socket sock, String file, long blockId, 
+                                       long genStamp,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum)
                                        throws IOException {
@@ -1003,6 +1004,7 @@
       out.writeShort( DATA_TRANSFER_VERSION );
       out.write( OP_READ_BLOCK );
       out.writeLong( blockId );
+      out.writeLong( genStamp );
       out.writeLong( startOffset );
       out.writeLong( len );
       out.flush();
@@ -1268,6 +1270,7 @@
           Block blk = targetBlock.getBlock();
           
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
+              blk.getGenerationStamp(),
               offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
               buffersize, verifyChecksum);
           return chosenNode;
@@ -1449,6 +1452,7 @@
               
           reader = BlockReader.newBlockReader(dn, src, 
                                               block.getBlock().getBlockId(),
+                                              block.getBlock().getGenerationStamp(),
                                               start, len, buffersize, 
                                               verifyChecksum);
           int nread = reader.readAll(buf, offset, len);
@@ -2288,6 +2292,7 @@
         out.writeShort( DATA_TRANSFER_VERSION );
         out.write( OP_WRITE_BLOCK );
         out.writeLong( block.getBlockId() );
+        out.writeLong( block.getGenerationStamp() );
         out.writeInt( nodes.length );
         out.writeBoolean( recoveryFlag );       // recovery flag
         Text.writeString( out, client );

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java Tue May 13 23:59:46 2008
@@ -193,8 +193,8 @@
       //still keep 'info.lastScanType' to NONE.
       addBlockInfo(info);
     }
-    
-    /* Pick the first directory that has any existing sanner log.
+
+    /* Pick the first directory that has any existing scanner log.
      * otherwise, pick the first directory.
      */
     File dir = null;
@@ -327,6 +327,7 @@
   static private class LogEntry {
     long blockId = -1;
     long verificationTime = -1;
+    long genStamp = Block.GRANDFATHER_GENERATION_STAMP;
     
     /**
      * The format consists of single line with multiple entries. each 
@@ -340,6 +341,7 @@
     static String newEnry(Block block, long time) {
       return "date=\"" + dateFormat.format(new Date(time)) + "\"\t " +
              "time=\"" + time + "\"\t " +
+             "genstamp=\"" + block.getGenerationStamp() + "\"\t " +
              "id=\"" + block.getBlockId() +"\"";
     }
     
@@ -355,6 +357,8 @@
           entry.blockId = Long.valueOf(value);
         } else if (name.equals("time")) {
           entry.verificationTime = Long.valueOf(value);
+        } else if (name.equals("genstamp")) {
+          entry.genStamp = Long.valueOf(value);
         }
       }
       
@@ -478,7 +482,7 @@
     }
     
     // update verification times from the verificationLog.
-    Block tmpBlock = new Block(0, 0);    
+    Block tmpBlock = new Block();
     while (logReader != null && logReader.hasNext()) {
       if (!datanode.shouldRun || Thread.interrupted()) {
         return false;
@@ -486,7 +490,7 @@
       String line = logReader.next();
       LogEntry entry = LogEntry.parseEntry(line);
       synchronized (this) {
-        tmpBlock.blkid = entry.blockId;
+        tmpBlock.set(entry.blockId, 0, entry.genStamp);
         BlockScanInfo info = blockMap.get(tmpBlock);
         
         if(info != null && entry.verificationTime > 0 && 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataChecksum.java Tue May 13 23:59:46 2008
@@ -205,8 +205,8 @@
   public int getNumBytesInSum() {
     return inSum;
   }
-  public int getChecksumHeaderSize() {
-    return 2 + 1 + 4; // version: short + type : byte + bytesPerChecksum : int
+  static public int getChecksumHeaderSize() {
+    return 1 + FSConstants.SIZE_OF_INTEGER; // type byte, bytesPerChecksum int
   }
   //Checksum Interface. Just a wrapper around member summer.
   public long getValue() {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue May 13 23:59:46 2008
@@ -36,6 +36,7 @@
 import org.apache.hadoop.dfs.DatanodeProtocol;
 import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.dfs.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.dfs.BlockMetadataHeader;
 
 import java.io.*;
 import java.net.*;
@@ -131,8 +132,8 @@
   private boolean transferToAllowed = true;
   private int writePacketSize = 0;
   
-  DataBlockScanner blockScanner;
-  Daemon blockScannerThread;
+  DataBlockScanner blockScanner = null;
+  Daemon blockScannerThread = null;
   
   private static final Random R = new Random();
 
@@ -314,12 +315,11 @@
     } 
     if ( reason == null ) {
       blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
-      blockScannerThread = new Daemon(blockScanner);
     } else {
       LOG.info("Periodic Block Verification is disabled because " +
                reason + ".");
     }
-    
+
     //create a servlet to serve full-file content
     String infoAddr = 
       NetUtils.getServerAddress(conf, 
@@ -725,6 +725,14 @@
           }
           processCommand(cmd);
         }
+
+        // start block scanner
+        if (blockScanner != null && blockScannerThread == null &&
+            upgradeManager.isUpgradeCompleted()) {
+          LOG.info("Starting Periodic block scanner.");
+          blockScannerThread = new Daemon(blockScanner);
+          blockScannerThread.start();
+        }
             
         //
         // There is no work to do;  sleep until hearbeat timer elapses, 
@@ -1055,7 +1063,7 @@
       // Read in the header
       //
       long blockId = in.readLong();          
-      Block block = new Block( blockId, 0 );
+      Block block = new Block( blockId, 0 , in.readLong());
 
       long startOffset = in.readLong();
       long length = in.readLong();
@@ -1123,7 +1131,7 @@
       //
       // Read in the header
       //
-      Block block = new Block(in.readLong(), estimateBlockSize);
+      Block block = new Block(in.readLong(), estimateBlockSize, in.readLong());
       LOG.info("Receiving block " + block + 
                " src: " + remoteAddress +
                " dest: " + localAddress);
@@ -1184,6 +1192,7 @@
             mirrorOut.writeShort( DATA_TRANSFER_VERSION );
             mirrorOut.write( OP_WRITE_BLOCK );
             mirrorOut.writeLong( block.getBlockId() );
+            mirrorOut.writeLong( block.getGenerationStamp() );
             mirrorOut.writeInt( pipelineSize );
             mirrorOut.writeBoolean( isRecovery );
             Text.writeString( mirrorOut, client );
@@ -1281,7 +1290,7 @@
     void readMetadata(DataInputStream in) throws IOException {
       xceiverCount.incr();
 
-      Block block = new Block( in.readLong(), 0 );
+      Block block = new Block( in.readLong(), 0 , in.readLong());
       MetaDataInputStream checksumIn = null;
       DataOutputStream out = null;
       
@@ -1325,7 +1334,7 @@
     private void copyBlock(DataInputStream in) throws IOException {
       // Read in the header
       long blockId = in.readLong(); // read block id
-      Block block = new Block(blockId, 0);
+      Block block = new Block(blockId, 0, in.readLong());
       String source = Text.readString(in); // read del hint
       DatanodeInfo target = new DatanodeInfo(); // read target
       target.readFields(in);
@@ -1356,6 +1365,7 @@
         targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
         targetOut.writeByte(OP_REPLACE_BLOCK); // op code
         targetOut.writeLong(block.getBlockId()); // block id
+        targetOut.writeLong(block.getGenerationStamp()); // block id
         Text.writeString( targetOut, source); // del hint
 
         // then send data
@@ -1401,7 +1411,7 @@
       balancingSem.acquireUninterruptibly();
 
       /* read header */
-      Block block = new Block(in.readLong(), estimateBlockSize); // block id & len
+      Block block = new Block(in.readLong(), estimateBlockSize, in.readLong()); // block id & len
       String sourceID = Text.readString(in);
 
       short opStatus = OP_STATUS_SUCCESS;
@@ -1538,9 +1548,9 @@
      +----------------------------------------------+
      
      Processed in readBlock() :
-     +-------------------------------------------------------+
-     | 8 byte Block ID | 8 byte start offset | 8 byte length |
-     +-------------------------------------------------------+
+     +-------------------------------------------------------------------------+
+     | 8 byte Block ID | 8 byte genstamp | 8 byte start offset | 8 byte length |
+     +-------------------------------------------------------------------------+
      
      Client sends optional response only at the end of receiving data.
        
@@ -1648,12 +1658,14 @@
                                           BUFFER_SIZE));
 
           // read and handle the common header here. For now just a version
-          short version = checksumIn.readShort();
+         BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+         short version = header.getVersion();
+
           if (version != FSDataset.METADATA_VERSION) {
             LOG.warn("Wrong version (" + version + ") for metadata file for "
                 + block + " ignoring ...");
           }
-          checksum = DataChecksum.newDataChecksum(checksumIn);
+          checksum = header.getChecksum();
         } else {
           LOG.warn("Could not find metadata file for " + block);
           // This only decides the buffer size. Use BUFFER_SIZE?
@@ -2597,8 +2609,7 @@
       try {
         // write data chunk header
         if (!finalized) {
-          checksumOut.writeShort(FSDataset.METADATA_VERSION);
-          checksum.writeHeader(checksumOut);
+          BlockMetadataHeader.writeHeader(checksumOut, checksum);
         }
         if (clientName.length() > 0) {
           responder = new Daemon(threadGroup, 
@@ -2690,7 +2701,7 @@
                               " which is not a multiple of bytesPerChecksum " +
                                bytesPerChecksum);
       }
-      long offsetInChecksum = checksum.getChecksumHeaderSize() + 
+      long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
                               offsetInBlock / bytesPerChecksum * checksumSize;
       if (out != null) {
        out.flush();
@@ -2755,6 +2766,7 @@
         out.writeShort(DATA_TRANSFER_VERSION);
         out.writeByte(OP_WRITE_BLOCK);
         out.writeLong(b.getBlockId());
+        out.writeLong(b.getGenerationStamp());
         out.writeInt(0);           // no pipelining
         out.writeBoolean(false);   // not part of recovery
         Text.writeString(out, ""); // client
@@ -2791,11 +2803,6 @@
   public void run() {
     LOG.info(dnRegistration + "In DataNode.run, data = " + data);
 
-    // start block scanner
-    if (blockScannerThread != null) {
-      blockScannerThread.start();
-    }
-
     // start dataXceiveServer
     dataXceiveServer.start();
         

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeBlockInfo.java Tue May 13 23:59:46 2008
@@ -109,7 +109,7 @@
     if (file == null || volume == null) {
       throw new IOException("detachBlock:Block not found. " + block);
     }
-    File meta = FSDataset.getMetaFile(file);
+    File meta = FSDataset.getMetaFile(file, block);
     if (meta == null) {
       throw new IOException("Meta file not found for block " + block);
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java Tue May 13 23:59:46 2008
@@ -352,7 +352,8 @@
     // as part this block report - which why block list is stored as longs
     Block iblk = new Block(); // a fixed new'ed block to be reused with index i
     for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
-      iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i));
+      iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), 
+               newReport.getBlockGenStamp(i));
       BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
       if(storedBlock == null) { // Brand new block
         toAdd.add(new Block(iblk));

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue May 13 23:59:46 2008
@@ -314,7 +314,7 @@
     DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()}; 
     lblocks[0] = new LocatedBlock(dataBlock, dataNode);
     LOG.info("Found checksum error in data stream at block="
-        + dataBlock.getBlockName() + " on datanode="
+        + dataBlock + " on datanode="
         + dataNode[0].getName());
 
     // Find block in checksum stream
@@ -327,7 +327,7 @@
     DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()}; 
     lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
     LOG.info("Found checksum error in checksum stream at block="
-        + sumsBlock.getBlockName() + " on datanode="
+        + sumsBlock + " on datanode="
         + sumsNode[0].getName());
 
     // Ask client to delete blocks.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Tue May 13 23:59:46 2008
@@ -189,7 +189,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -14;
+  public static final int LAYOUT_VERSION = -15;
   // Current version: 
-  // Remove storing locations of last block of a file in fsimage
+  // Store generation stamp with each Block
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Tue May 13 23:59:46 2008
@@ -93,8 +93,8 @@
                           boolean resetIdx) throws IOException {
       if (numBlocks < maxBlocksPerDir) {
         File dest = new File(dir, b.getBlockName());
-        File metaData = getMetaFile( src );
-        if ( ! metaData.renameTo( getMetaFile(dest) ) ||
+        File metaData = getMetaFile( src, b );
+        if ( ! metaData.renameTo( getMetaFile(dest, b) ) ||
             ! src.renameTo( dest ) ) {
           throw new IOException( "could not move files for " + b +
                                  " from tmp to " + 
@@ -139,6 +139,31 @@
       return children[ lastChildIdx ].addBlock(b, src, true, false); 
     }
 
+    /** Find the metadata file for the specified block file.
+     * Return the generation stamp from the name of the metafile.
+     */
+    long getGenerationStampFromFile(File[] listdir, File blockFile) {
+      String blockName = blockFile.getName();
+      for (int j = 0; j < listdir.length; j++) {
+        String path = listdir[j].getName();
+        if (!path.startsWith(blockName)) {
+          continue;
+        }
+        String[] vals = path.split("_");
+        if (vals.length != 3) {     // blk, blkid, genstamp.meta
+          continue;
+        }
+        String[] str = vals[2].split("\\.");
+        if (str.length != 2) {
+          continue;
+        }
+        return Long.parseLong(str[0]);
+      }
+      DataNode.LOG.warn("Block " + blockFile + 
+                        " does not have a metafile!");
+      return Block.GRANDFATHER_GENERATION_STAMP;
+    }
+
     /**
      * Populate the given blockSet with any child blocks
      * found at this node.
@@ -153,12 +178,12 @@
       File blockFiles[] = dir.listFiles();
       for (int i = 0; i < blockFiles.length; i++) {
         if (Block.isBlockFilename(blockFiles[i])) {
-          blockSet.add(new Block(blockFiles[i], blockFiles[i].length()));
+          long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
+          blockSet.add(new Block(blockFiles[i], blockFiles[i].length(), genStamp));
         }
       }
     }
 
-
     void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
@@ -168,9 +193,9 @@
 
       File blockFiles[] = dir.listFiles();
       for (int i = 0; i < blockFiles.length; i++) {
-        //We are not enforcing presense of metadata file
         if (Block.isBlockFilename(blockFiles[i])) {
-          volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), 
+          long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
+          volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp), 
                         new DatanodeBlockInfo(volume, blockFiles[i]));
         }
       }
@@ -363,7 +388,7 @@
       
     File addBlock(Block b, File f) throws IOException {
       File blockFile = dataDir.addBlock(b, f);
-      File metaFile = getMetaFile( blockFile );
+      File metaFile = getMetaFile( blockFile , b);
       dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
       return blockFile;
     }
@@ -508,9 +533,6 @@
   public static final String METADATA_EXTENSION = ".meta";
   public static final short METADATA_VERSION = 1;
     
-  static File getMetaFile( File f ) {
-    return new File( f.getAbsolutePath() + METADATA_EXTENSION );
-  }
 
   static class ActiveFile {
     File file;
@@ -525,9 +547,14 @@
     }
   } 
   
+  static File getMetaFile(File f , Block b) {
+    return new File( f.getAbsolutePath() +
+                     "_" + b.getGenerationStamp() + METADATA_EXTENSION ); 
+  }
   protected File getMetaFile(Block b) throws IOException {
     File blockFile = getBlockFile( b );
-    return getMetaFile(blockFile); 
+    return new File( blockFile.getAbsolutePath() + 
+                     "_" + b.getGenerationStamp() + METADATA_EXTENSION ); 
   }
 
   public boolean metaFileExists(Block b) throws IOException {
@@ -628,9 +655,9 @@
     return new FileInputStream(blockInFile.getFD());
   }
     
-  BlockWriteStreams createBlockWriteStreams( File f ) throws IOException {
+  private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
       return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
-          new FileOutputStream( new RandomAccessFile( getMetaFile( f ) , "rw" ).getFD() ));
+          new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
 
   }
 
@@ -737,7 +764,8 @@
     // REMIND - mjc - make this a filter stream that enforces a max
     // block size, so clients can't go crazy
     //
-    return createBlockWriteStreams( f );
+    File metafile = getMetaFile(f, b);
+    return createBlockWriteStreams( f , metafile);
   }
 
   /**
@@ -902,7 +930,7 @@
         v.clearPath(parent);
         volumeMap.remove(invalidBlks[i]);
       }
-      File metaFile = getMetaFile( f );
+      File metaFile = getMetaFile( f, invalidBlks[i] );
       long blockSize = f.length()+metaFile.length();
       if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
         DataNode.LOG.warn("Unexpected error trying to delete block "

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Tue May 13 23:59:46 2008
@@ -28,6 +28,8 @@
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.util.ArrayList;
 import java.lang.Math;
 import java.nio.channels.FileChannel;
@@ -448,7 +450,19 @@
               blockSize = readLong(in);
             }
             // get blocks
-            Block blocks[] = readBlocks(in);
+            Block blocks[] = null;
+            if (logVersion <= -14) {
+              blocks = readBlocks(in);
+            } else {
+              BlockTwo oldblk = new BlockTwo();
+              int num = in.readInt();
+              blocks = new Block[num];
+              for (int i = 0; i < num; i++) {
+                oldblk.readFields(in);
+                blocks[i] = new Block(oldblk.blkid, oldblk.len, 
+                                      Block.GRANDFATHER_GENERATION_STAMP);
+              }
+            }
 
             // Older versions of HDFS does not store the block size in inode.
             // If the file has more than one block, use the size of the
@@ -1003,6 +1017,41 @@
     sizeFlushBuffer = size;
   }
 
+  /**
+   * A class to read in blocks stored in the old format. The only two
+   * fields in the block were blockid and length.
+   */
+  static class BlockTwo implements Writable {
+    long blkid;
+    long len;
+
+    static {                                      // register a ctor
+      WritableFactories.setFactory
+        (BlockTwo.class,
+         new WritableFactory() {
+           public Writable newInstance() { return new BlockTwo(); }
+         });
+    }
+
+
+    BlockTwo() {
+      blkid = 0;
+      len = 0;
+    }
+    /////////////////////////////////////
+    // Writable
+    /////////////////////////////////////
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(blkid);
+      out.writeLong(len);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      this.blkid = in.readLong();
+      this.len = in.readLong();
+    }
+  }
+
   /** This method is defined for compatibility reason. */
   static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
       ) throws IOException {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Tue May 13 23:59:46 2008
@@ -750,7 +750,12 @@
           blocks = new Block[numBlocks];
           for (int j = 0; j < numBlocks; j++) {
             blocks[j] = new Block();
-            blocks[j].readFields(in);
+            if (-14 < imgVersion) {
+              blocks[j].set(in.readLong(), in.readLong(), 
+                            Block.GRANDFATHER_GENERATION_STAMP);
+            } else {
+              blocks[j].readFields(in);
+            }
           }
         }
         // Older versions of HDFS does not store the block size in inode.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue May 13 23:59:46 2008
@@ -1047,11 +1047,11 @@
     // Remove the block from the pending creates list
     //
     NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                  +b.getBlockName()+"of file "+src);
+                                  +b+"of file "+src);
     INodeFileUnderConstruction file = checkLease(src, holder);
     dir.removeBlock(src, file, b);
     NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                    + b.getBlockName()
+                                    + b
                                     + " is removed from pendingCreates");
     return true;
   }
@@ -1185,11 +1185,12 @@
   private Block allocateBlock(String src, INode file) throws IOException {
     Block b = null;
     do {
-      b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
+      b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 
+                    getGenerationStamp());
     } while (isValidBlock(b));
     b = dir.addBlock(src, file, b);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
-                                 +src+ ". "+b.getBlockName());
+                                 +src+ ". "+b);
     return b;
   }
 
@@ -1300,14 +1301,14 @@
   public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
     throws IOException {
     NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " 
-                                 + blk.getBlockName() + " on " 
+                                 + blk + " on " 
                                  + dn.getName());
     if (isInSafeMode()) {
-      throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
+      throw new SafeModeException("Cannot invalidate block " + blk, safeMode);
     }
     DatanodeDescriptor node = getDatanode(dn);
     if (node == null) {
-      throw new IOException("Cannot invalidate block " + blk.getBlockName() +
+      throw new IOException("Cannot invalidate block " + blk +
                             " because datanode " + dn.getName() +
                             " does not exist.");
     }
@@ -1319,11 +1320,11 @@
       addToInvalidates(blk, dn);
       removeStoredBlock(blk, node);
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
-                                   + blk.getBlockName() + " on " 
+                                   + blk + " on " 
                                    + dn.getName() + " listed for deletion.");
     } else {
       NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
-                                   + blk.getBlockName() + " on " 
+                                   + blk + " on " 
                                    + dn.getName() + " is the only copy and was not deleted.");
     }
   }
@@ -2100,7 +2101,7 @@
           neededReplicationsIterator.remove(); // remove from neededReplications
           replIndex--;
           NameNode.stateChangeLog.info("BLOCK* "
-              + "Removing block " + block.getBlockName()
+              + "Removing block " + block
               + " from neededReplications as it does not belong to any file.");
           continue;
         }
@@ -2127,7 +2128,7 @@
           replIndex--;
           pendingReplications.add(block, targets.length);
           NameNode.stateChangeLog.debug(
-              "BLOCK* block " + block.getBlockName()
+              "BLOCK* block " + block
               + " is moved from neededReplications to pendingReplications");
         }
         if (NameNode.stateChangeLog.isInfoEnabled()) {
@@ -2139,7 +2140,7 @@
           NameNode.stateChangeLog.info(
                     "BLOCK* ask "
                     + srcNode.getName() + " to replicate "
-                    + block.getBlockName() + " to " + targetList);
+                    + block + " to " + targetList);
           NameNode.stateChangeLog.debug(
                     "BLOCK* neededReplications = " + neededReplications.size()
                     + " pendingReplications = " + pendingReplications.size());
@@ -2257,7 +2258,7 @@
       StringBuffer blockList = new StringBuffer();
       for(Block blk : blocksToInvalidate) {
         blockList.append(' ');
-        blockList.append(blk.getBlockName());
+        blockList.append(blk);
       }
       NameNode.stateChangeLog.info("BLOCK* ask "
           + dn.getName() + " to delete " + blockList);
@@ -2492,7 +2493,7 @@
           obsolete.add(b);
         }
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
-                                      +"ask "+nodeID.getName()+" to delete "+b.getBlockName());
+                                      +"ask "+nodeID.getName()+" to delete "+b);
       }
     }
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
@@ -2587,12 +2588,12 @@
       //
       if (!isInSafeMode()) {
         NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
-                                      +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName()+" size "+block.getNumBytes());
+                                      +"blockMap updated: "+node.getName()+" is added to "+block+" size "+block.getNumBytes());
       }
     } else {
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
                                    + "Redundant addStoredBlock request received for " 
-                                   + block.getBlockName() + " on " + node.getName()
+                                   + block + " on " + node.getName()
                                    + " size " + block.getNumBytes());
     }
     //
@@ -2601,7 +2602,7 @@
     if (fileINode == null) {
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
                                    + "addStoredBlock request received for " 
-                                   + block.getBlockName() + " on " + node.getName()
+                                   + block + " on " + node.getName()
                                    + " size " + block.getNumBytes()
                                    + " But it does not belong to any file.");
       return block;
@@ -2812,7 +2813,7 @@
       }
       excessBlocks.add(b);
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
-                                    +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap");
+                                    +"("+cur.getName()+", "+b+") is added to excessReplicateMap");
 
       //
       // The 'excessblocks' tracks blocks until we get confirmation
@@ -2830,7 +2831,7 @@
       }
       invalidateSet.add(b);
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
-                                    +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets");
+                                    +"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
     }
   }
 
@@ -2840,10 +2841,10 @@
    */
   synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
     NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
-                                  +block.getBlockName() + " from "+node.getName());
+                                  +block + " from "+node.getName());
     if (!blocksMap.removeNode(block, node)) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
-                                    +block.getBlockName()+" has already been removed from node "+node);
+                                    +block+" has already been removed from node "+node);
       return;
     }
         
@@ -2867,7 +2868,7 @@
     if (excessBlocks != null) {
       excessBlocks.remove(block);
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
-                                    +block.getBlockName()+" is removed from excessBlocks");
+                                    +block+" is removed from excessBlocks");
       if (excessBlocks.size() == 0) {
         excessReplicateMap.remove(node.getStorageID());
       }
@@ -2886,16 +2887,16 @@
     DatanodeDescriptor node = getDatanode(nodeID);
     if (node == null) {
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
-                                   + block.getBlockName() + " is received from an unrecorded node " 
+                                   + block + " is received from an unrecorded node " 
                                    + nodeID.getName());
       throw new IllegalArgumentException(
                                          "Unexpected exception.  Got blockReceived message from node " 
-                                         + block.getBlockName() + ", but there is no info for it");
+                                         + block + ", but there is no info for it");
     }
         
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
-                                    +block.getBlockName()+" is received from " + nodeID.getName());
+                                    +block+" is received from " + nodeID.getName());
     }
 
     // Check if this datanode should actually be shutdown instead.
@@ -2910,7 +2911,7 @@
       delHintNode = datanodeMap.get(delHint);
       if(delHintNode == null) {
         NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
-            + block.getBlockName()
+            + block
             + " is expected to be removed from an unrecorded node " 
             + delHint);
       }

Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java?rev=656122&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/GenerationStampUpgrade.java Tue May 13 23:59:46 2008
@@ -0,0 +1,981 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.net.SocketTimeoutException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.*;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+
+/**
+ * This class associates a block generation stamp with with block. This
+ * generation stamp is written to each metadata file. Please see
+ * HADOOP-1700 for details.
+ */
+class GenerationStampUpgradeDatanode extends UpgradeObjectDatanode {
+
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.dfs.GenerationStampUpgrade");
+
+  DatanodeProtocol namenode;
+  InetSocketAddress namenodeAddr;
+
+  // stats
+  private AtomicInteger blocksPreviouslyUpgraded = new AtomicInteger(0);
+  private AtomicInteger blocksToUpgrade = new AtomicInteger(0);
+  private AtomicInteger blocksUpgraded = new AtomicInteger(0);
+  private AtomicInteger errors = new AtomicInteger(0);
+
+  // process the upgrade using a pool of threads.
+  static private final int poolSize = 4;
+
+  // If no progress has occured during this time, print warnings message.
+  static private final int LONG_TIMEOUT_MILLISEC = 1*60*1000; // 1 minute
+
+  // This object is needed to indicate that namenode is not running upgrade.
+  static UpgradeCommand noUpgradeOnNamenode = new UpgradeCommand();
+
+  private List<UpgradeExecutor> completedList = new LinkedList<UpgradeExecutor>();
+
+  /* This is set when the datanode misses the regular upgrade.
+   * When this is set, it upgrades the block but stops heartbeating
+   * to the namenode.
+   */
+  private AtomicBoolean offlineUpgrade = new AtomicBoolean(false);
+  private AtomicBoolean upgradeCompleted = new AtomicBoolean(false);
+  
+  // Implement the common interfaces required by UpgradeObjectDatanode
+  
+  public int getVersion() {
+    return GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION;
+  }
+
+  /*
+   * Start upgrade if it not already running. It sends status to
+   * namenode even if an upgrade is already in progress.
+   */
+  public synchronized UpgradeCommand startUpgrade() throws IOException {
+    if (offlineUpgrade.get()) {
+      doUpgrade();
+    }
+    return null; 
+  }
+
+  public String getDescription() {
+    return "Block Generation Stamp Upgrade at Datanode";
+  }
+
+  public short getUpgradeStatus() {
+    return (blocksToUpgrade.get() == blocksUpgraded.get()) ? 100 :
+      (short) Math.floor(blocksUpgraded.get()*100.0/blocksToUpgrade.get());
+  }
+
+  public UpgradeCommand completeUpgrade() throws IOException {
+    // return latest stats command.
+    assert getUpgradeStatus() == 100;
+    return new DatanodeStatsCommand(getUpgradeStatus(),
+                                    getDatanode().dnRegistration,
+                                    blocksPreviouslyUpgraded.get() + blocksUpgraded.get(),
+                                    blocksToUpgrade.get()-blocksUpgraded.get(),
+                                    errors.get(),
+                                    GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION);
+  }
+  
+  @Override
+  boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
+    int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
+    if(nsUpgradeVersion >= getVersion()) {
+      return false; // Normal upgrade.
+    }
+    
+    LOG.info("\n  This Datanode has missed a cluster wide Block generation Stamp Upgrade." +
+             "\n  Will perform an 'offline' upgrade of the blocks." +
+             "\n  During this time, Datanode does not heartbeat.");
+    
+    
+    // Namenode removes this node from the registered nodes
+    try {
+      getDatanode().namenode.errorReport(getDatanode().dnRegistration,
+                                    DatanodeProtocol.NOTIFY, 
+                                    "Performing an offline generation stamp " +
+                                    "upgrade. " +
+                                    "Will be back online once the ugprade " +
+                                    "completes. Please see datanode logs.");
+      
+    } catch(IOException ignored) {
+      LOG.info("\n  This Datanode was unable to send error report to namenode.");
+    }
+    offlineUpgrade.set(true);
+    return true;
+  }
+
+  public GenerationStampUpgradeDatanode() {
+    blocksPreviouslyUpgraded.set(0);
+    blocksToUpgrade.set(0);
+    blocksUpgraded.set(0);
+    errors.set(0);
+  }
+
+  static File getPreGenerationMetaFile(File f) {
+    return new File(f.getAbsolutePath() + FSDataset.METADATA_EXTENSION);
+  }
+  
+  // This class is invoked by the worker thread to convert the
+  // metafile into the new format
+  //
+  class UpgradeExecutor implements Runnable {
+    Block block;
+    Throwable throwable;
+    
+    UpgradeExecutor(Block b) {
+      block = b;
+    }
+
+    public void run() {
+      try {
+        // do the real work here
+        FSDataset dataset = (FSDataset) getDatanode().data;
+        upgradeToCurVersion(dataset, block);
+      } catch (Throwable t) {
+        throwable = t;
+      }
+      synchronized (completedList) {
+        completedList.add(this);
+        completedList.notify();
+      }
+    }
+
+    /**
+     * Upgrades the metadata file to current version if required.
+     * @param dataset
+     * @param block
+     */
+    void upgradeToCurVersion(FSDataset dataset, Block block)
+                                              throws IOException {
+      File blockFile = dataset.getBlockFile(block);
+      if (blockFile == null) {
+        throw new IOException("Could find file for " + block);
+      }
+
+      File metadataFile = dataset.getMetaFile(block);
+      File oldmetadataFile = getPreGenerationMetaFile(blockFile);
+
+      if (metadataFile.exists() && oldmetadataFile.exists()) {
+        //
+        // If both file exists and are of the same size,
+        // then delete the old one. If the sizes are not same then
+        // leave both of them and consider the upgrade as successful.
+        //
+        if (metadataFile.length() == oldmetadataFile.length()) {
+          if (!oldmetadataFile.delete()) {
+            LOG.info("Unable to delete old metadata file " + oldmetadataFile);
+          }
+        }
+      } else if (metadataFile.exists()) {
+        //
+        // Only the new file exists, nothing more to do.
+        //
+        return;
+      } else if (oldmetadataFile.exists()) {
+        //
+        // The old file exists but the new one is missing. Rename
+        // old one to new name.
+        //
+        if (!oldmetadataFile.renameTo(metadataFile)) {
+          throw new IOException("Could find rename " +  oldmetadataFile +
+                                " to " + metadataFile);
+        }
+      } else {
+        throw new IOException("Could find any metadata file for " + block);
+      }
+    }
+  }
+  
+  // This method iterates through all the blocks on a datanode and
+  // do the upgrade.
+  //
+  void doUpgrade() throws IOException {
+    
+    if (upgradeCompleted.get()) {
+      assert offlineUpgrade.get() : 
+             ("Multiple calls to doUpgrade is expected only during " +
+              "offline upgrade");
+      return;
+    }
+    
+    FSDataset dataset = (FSDataset) getDatanode().data;
+
+    // Set up the retry policy so that each attempt waits for one minute.
+    Configuration conf = new Configuration();
+    // set rpc timeout to one minute.
+    conf.set("ipc.client.timeout", "60000");
+
+    RetryPolicy timeoutPolicy =
+       RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+               LONG_TIMEOUT_MILLISEC/1000,
+               1, TimeUnit.MILLISECONDS);
+
+    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(SocketTimeoutException.class, timeoutPolicy);
+    RetryPolicy methodPolicy = RetryPolicies.retryByException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+    Map<String,RetryPolicy> methodNameToPolicyMap =
+                            new HashMap<String, RetryPolicy>();
+    // do we need to set the policy for connection failures also?
+    methodNameToPolicyMap.put("processUpgradeCommand", methodPolicy);
+
+    LOG.info("Starting Block Generation Stamp Upgrade on datanode " +
+             getDatanode());
+
+    for (;;) {
+      try {
+        namenodeAddr = getDatanode().getNameNodeAddr();
+        namenode = (DatanodeProtocol) RetryProxy.create(
+                            DatanodeProtocol.class,
+                            RPC.waitForProxy(DatanodeProtocol.class,
+                                             DatanodeProtocol.versionID,
+                                             namenodeAddr,
+                                             conf),
+                            methodNameToPolicyMap);
+        break;
+      } catch (IOException e) {
+        LOG.warn("Generation Stamp Upgrade Exception " +
+                 "while trying to connect to NameNode at " +
+                 getDatanode().getNameNodeAddr().toString() + " : " +
+                 StringUtils.stringifyException(e));
+        try {
+          Thread.sleep(10*1000);
+        } catch (InterruptedException e1) {
+          throw new IOException("Interrupted Sleep while creating RPC proxy." +
+                                e1);
+        }
+      }
+    }
+    LOG.info("Block Generation Stamp Upgrade Datanode connected to " +
+             "namenode at " + namenodeAddr);
+
+    // Get a list of all the blocks :
+    LinkedList<UpgradeExecutor> blockList = new LinkedList<UpgradeExecutor>();
+    
+    //Fill blockList with blocks to be upgraded.
+    Block [] blockArr = dataset.getBlockReport();
+    
+    for (Block b : blockArr) {
+      File blockFile = null;
+      try {
+        blockFile = dataset.getBlockFile(b);
+      } catch (IOException e) {
+        //The block might just be deleted. ignore it.
+        LOG.warn("Could not find file location for " + b + 
+                 ". It might already be deleted. Exception : " +
+                 StringUtils.stringifyException(e));
+        errors.getAndIncrement();
+        continue;
+      }
+      if (!blockFile.exists()) {
+        errors.getAndIncrement();
+        LOG.error("could not find block file " + blockFile);
+        continue;
+      }
+      File metaFile = dataset.getMetaFile(b);
+      File oldMetaFile = getPreGenerationMetaFile(blockFile);
+      if (metaFile.exists()) {
+        blocksPreviouslyUpgraded.getAndIncrement();
+        continue;
+      }
+      blocksToUpgrade.getAndIncrement();
+      blockList.add(new UpgradeExecutor(b));
+    }
+    blockArr = null;
+    int nLeft = blockList.size();
+    
+    LOG.info("Starting upgrade of " + blocksToUpgrade.get() + " blocks out of " +
+             (blocksToUpgrade.get() + blocksPreviouslyUpgraded.get()));
+
+    // Start the pool of upgrade workers
+    ExecutorService pool = Executors.newFixedThreadPool(poolSize);
+    for (Iterator<UpgradeExecutor> it = blockList.iterator(); it.hasNext();) {
+      pool.submit(it.next());
+    }
+
+    // Inform the namenode
+    sendStatus();
+    
+    // Report status to namenode every so many seconds:
+    long now = System.currentTimeMillis();
+    long statusReportIntervalMilliSec = 30*1000;
+    long lastStatusReportTime = now;
+    long lastUpdateTime = now;
+    long lastWarnTime = now;
+    
+    // Now wait for the tasks to complete.
+    //
+    while (nLeft > 0) {
+      synchronized (completedList) {
+        if (completedList.size() <= 0) {
+          try {
+            completedList.wait(1000);
+          } catch (InterruptedException ignored) {}
+        }
+        
+        now = System.currentTimeMillis();
+        
+        if (completedList.size()> 0) {
+          UpgradeExecutor exe = completedList.remove(0);
+          nLeft--;
+          if (exe.throwable != null) {
+            errors.getAndIncrement();
+            LOG.error("Got an exception during generation stamp upgrade of " +
+                      exe.block + ": " + 
+                      StringUtils.stringifyException(exe.throwable));
+          }
+          blocksUpgraded.getAndIncrement();
+          lastUpdateTime = now;
+        } else {
+          if ((now - lastUpdateTime) >= LONG_TIMEOUT_MILLISEC &&
+              (now - lastWarnTime) >= LONG_TIMEOUT_MILLISEC) {
+            lastWarnTime = now;
+            LOG.warn("No block was updated in last " +
+                      (LONG_TIMEOUT_MILLISEC/(60*1000)) +
+                      " minutes! will keep waiting... ");
+          }  
+        } 
+      }
+      
+      if ((now-lastStatusReportTime) > statusReportIntervalMilliSec) {
+        sendStatus();
+        lastStatusReportTime = System.currentTimeMillis();
+      }
+    }
+
+    pool.shutdown();
+    upgradeCompleted.set(true);
+    
+    LOG.info("Completed Block Generation Stamp Upgrade. Total of " + 
+             (blocksPreviouslyUpgraded.get() + blocksToUpgrade.get()) +
+             " blocks : " + blocksPreviouslyUpgraded.get() + " blocks previously " +
+             "upgraded, " + blocksUpgraded.get() + " blocks upgraded this time " +
+             "with " + errors.get() + " errors.");       
+
+    // now inform the name node about the completion.
+    // What if there is no upgrade running on Namenode now?
+    while (!sendStatus());
+    
+  }
+  
+  /** Sends current status and stats to namenode and logs it to local log*/ 
+  boolean sendStatus() {
+    LOG.info((offlineUpgrade.get() ? "Offline " : "") + 
+              "Block Generation Stamp Upgrade : " + 
+               getUpgradeStatus() + "% completed.");
+    if (offlineUpgrade.get()) {
+      return true;
+    }
+    
+    DatanodeStatsCommand cmd = null;
+    synchronized (this) {
+      cmd = new DatanodeStatsCommand(getUpgradeStatus(),
+                           getDatanode().dnRegistration,
+                           blocksPreviouslyUpgraded.get() + blocksUpgraded.get(),
+                           blocksToUpgrade.get()-blocksUpgraded.get(),
+                           errors.get(),
+                           GenerationStampUpgradeNamenode.PRE_GENERATIONSTAMP_LAYOUT_VERSION);
+    }
+    UpgradeCommand reply = sendCommand(namenodeAddr, namenode, cmd, 0);
+    if (reply == null) {
+      LOG.warn("Could not send status to Namenode. Namenode might be " +
+               "over loaded or down.");
+    }
+    return reply != null;
+  }
+
+
+  // Sends a command to the namenode
+  static UpgradeCommand sendCommand(InetSocketAddress namenodeAddr,
+                                    DatanodeProtocol namenode,
+                                    UpgradeCommand cmd, int retries) {
+    for(int i=0; i<=retries || retries<0; i++) {
+      try {
+        UpgradeCommand reply = namenode.processUpgradeCommand(cmd);
+        if (reply == null) {
+          /* namenode might not be running upgrade or finished
+           * an upgrade. We just return a static object */
+          return noUpgradeOnNamenode;
+        }
+        return reply;
+      } catch (IOException e) {
+        // print the stack trace only for the last retry.
+        LOG.warn("Exception to " + namenodeAddr +
+                 " while sending command " + 
+                 cmd.getAction() + ": " + e +
+                 ((retries<0 || i>=retries)? "... will retry ..." :
+                   ": " + StringUtils.stringifyException(e)));
+      }
+    }
+    return null;
+  }
+}
+
+/**
+ * Once an upgrade starts at the namenode , this class manages the upgrade 
+ * process.
+ */
+class GenerationStampUpgradeNamenode extends UpgradeObjectNamenode {
+  
+  public static final Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.dfs.GenerationStampUpgradeNamenode");
+  
+  static final long inactivityExtension = 10*1000; // 10 seconds
+  AtomicLong lastNodeCompletionTime = new AtomicLong(0);
+
+  // The layout version before the generation stamp upgrade.
+  static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
+
+  static final int DN_CMD_STATS = 300;
+  
+  enum UpgradeStatus {
+    INITIALIZED,
+    STARTED,
+    DATANODES_DONE,
+    COMPLETED,
+  }
+  
+  UpgradeStatus upgradeStatus = UpgradeStatus.INITIALIZED;
+  
+  class DnInfo { 
+    short percentCompleted = 0;
+    long blocksUpgraded = 0;
+    long blocksRemaining = 0;
+    long errors = 0;
+    
+    DnInfo(short pcCompleted) {
+      percentCompleted = status;
+    }
+    DnInfo() {}
+    
+    void setStats(DatanodeStatsCommand cmd) {
+      percentCompleted = cmd.getCurrentStatus();
+      blocksUpgraded = cmd.blocksUpgraded;
+      blocksRemaining = cmd.blocksRemaining;
+      errors = cmd.errors;
+    }
+    
+    boolean isDone() {
+      return percentCompleted >= 100;
+    }
+  }
+  
+  /* We should track only the storageIDs and not DatanodeID, which
+   * includes datanode name and storage id.
+   */
+  HashMap<DatanodeID, DnInfo> dnMap = new HashMap<DatanodeID, DnInfo>();
+  HashMap<DatanodeID, DnInfo> unfinishedDnMap = 
+                                      new HashMap<DatanodeID, DnInfo>();  
+
+  Daemon monitorThread;
+  double avgDatanodeCompletionPct = 0;
+  boolean forceDnCompletion = false;
+  
+  //Upgrade object interface:
+  
+  public int getVersion() {
+    return PRE_GENERATIONSTAMP_LAYOUT_VERSION;
+  }
+
+  public UpgradeCommand completeUpgrade() throws IOException {
+    return null;
+  }
+ 
+  @Override
+  public String getDescription() {
+    return "Block Generation Stamp Upgrade at Namenode";
+  }
+
+  @Override
+  public synchronized short getUpgradeStatus() {
+    // Reserve 10% for deleting files.
+    if (upgradeStatus == UpgradeStatus.COMPLETED) {
+      return 100;
+    }   
+    return (short) avgDatanodeCompletionPct;
+  }
+
+  @Override
+  public UpgradeCommand startUpgrade() throws IOException {
+    
+    assert monitorThread == null;
+    lastNodeCompletionTime.set(System.currentTimeMillis());
+    
+    monitorThread = new Daemon(new UpgradeMonitor());
+    monitorThread.start();    
+    return super.startUpgrade();
+  }
+  
+  @Override
+  public synchronized void forceProceed() throws IOException {    
+    if (forceDnCompletion) {
+      LOG.warn("forceProceed is already set for this upgrade. It can take " +
+               "a short while to take affect. Please wait.");
+      return;
+    }
+    
+    LOG.info("got forceProceed request for this upgrade. Datanodes upgrade " +
+             "will be considered done. It can take a few seconds to take " +
+             "effect.");
+    forceDnCompletion = true;
+  }
+
+  @Override
+  UpgradeCommand processUpgradeCommand(UpgradeCommand command) 
+                                           throws IOException {
+    switch (command.getAction()) {
+
+    case GenerationStampUpgradeNamenode.DN_CMD_STATS :
+      return handleStatsCmd(command);
+
+     default:
+       throw new IOException("Unknown Command for Generation Stamp Upgrade : " +
+                             command.getAction());
+    }
+  }
+
+  @Override
+  public UpgradeStatusReport getUpgradeStatusReport(boolean details) 
+                                                    throws IOException {
+
+    /* If 'details' is true should we update block level status?
+     * It could take multiple minutes
+     * updateBlckLevelStats()?
+     */
+    
+    String replyString = "";
+    
+    short status = 0;
+    
+    synchronized (this) {
+     
+      status = getUpgradeStatus();
+     
+      replyString = String.format(
+      ((monitorThread == null) ? "\tUpgrade has not been started yet.\n" : "")+
+      ((forceDnCompletion) ? "\tForce Proceed is ON\n" : "") +
+      "\tLast Block Level Stats updated at : %tc\n" +
+      "\tLast Block Level Stats : %s\n" +
+      "\tBrief Datanode Status  : %s\n" +
+      "%s",
+      latestBlockLevelStats.updatedAt,
+      latestBlockLevelStats.statusString("\n\t                         "), 
+      printStatus("\n\t                         "), 
+      ((status < 100 && upgradeStatus == UpgradeStatus.DATANODES_DONE) ?
+      "\tNOTE: Upgrade at the Datanodes has finished. Deleteing \".crc\" " +
+      "files\n\tcan take longer than status implies.\n" : "")
+      );
+      
+      if (details) {
+        // list all the known data nodes
+        StringBuilder str = null;
+        Iterator<DatanodeID> keys = dnMap.keySet().iterator();
+        Iterator<DnInfo> values = dnMap.values().iterator();
+        
+        for(; keys.hasNext() && values.hasNext() ;) {
+          DatanodeID dn = keys.next();
+          DnInfo info = values.next();
+          String dnStr = "\t\t" + dn.getName() + "\t : " + 
+                         info.percentCompleted + " % \t" +
+                         info.blocksUpgraded + " u \t" +
+                         info.blocksRemaining + " r \t" +
+                         info.errors + " e\n";
+          if ( str == null ) {
+            str = new StringBuilder(dnStr.length()*
+                                    (dnMap.size() + (dnMap.size()+7)/8));
+          }
+          str.append(dnStr);
+        }
+        
+        replyString += "\n\tDatanode Stats (total: " + dnMap.size() + "): " +
+                       "pct Completion(%) blocks upgraded (u) " +
+                       "blocks remaining (r) errors (e)\n\n" +
+                       (( str == null ) ?
+                        "\t\tThere are no known Datanodes\n" : str);
+      }      
+    }
+    return new GenerationStampUpgradeStatusReport(
+                   PRE_GENERATIONSTAMP_LAYOUT_VERSION,
+                   status, replyString);
+  }
+
+
+  /**
+   * The namenode process a periodic statistics message from the datanode.
+   */
+  private synchronized UpgradeCommand handleStatsCmd(UpgradeCommand cmd) {
+    
+    DatanodeStatsCommand stats = (DatanodeStatsCommand)cmd;
+    
+    DatanodeID dn = stats.datanodeId;
+    DnInfo dnInfo = dnMap.get(dn);
+    boolean alreadyCompleted = (dnInfo != null && dnInfo.isDone());
+    
+    if (dnInfo == null) {
+      dnInfo = new DnInfo();
+      dnMap.put(dn, dnInfo);
+      LOG.info("Upgrade started/resumed at datanode " + dn.getName());  
+    }
+    
+    dnInfo.setStats(stats);
+    if (!dnInfo.isDone()) {
+      unfinishedDnMap.put(dn, dnInfo);
+    }
+    
+    if (dnInfo.isDone() && !alreadyCompleted) {
+      LOG.info("upgrade completed on datanode " + dn.getName());      
+      unfinishedDnMap.remove(dn);
+      if (unfinishedDnMap.size() == 0) {
+        lastNodeCompletionTime.set(System.currentTimeMillis());
+      }
+    }   
+    
+    //Should we send any more info?
+    return new UpgradeCommand();
+  }
+  
+  public GenerationStampUpgradeNamenode() {
+  }
+  
+  // For now we will wait for all the nodes to complete upgrade.
+  synchronized boolean isUpgradeDone() {
+    return upgradeStatus == UpgradeStatus.COMPLETED;    
+  }
+  
+  synchronized String printStatus(String spacing) {
+    //NOTE: iterates on all the datanodes.
+    
+    // Calculate % completion on all the data nodes.
+    long errors = 0;
+    long totalCompletion = 0;
+    for( Iterator<DnInfo> it = dnMap.values().iterator(); it.hasNext(); ) {
+      DnInfo dnInfo = it.next();
+      totalCompletion += dnInfo.percentCompleted;            
+      errors += dnInfo.errors;
+    }
+    
+    avgDatanodeCompletionPct = totalCompletion/(dnMap.size() + 1e-20);
+    
+    String msg = "Avg completion of all Datanodes: " +              
+                 String.format("%.2f%%", avgDatanodeCompletionPct) +
+                 " with " + errors + " errors. " +
+                 ((unfinishedDnMap.size() > 0) ? spacing + 
+                   unfinishedDnMap.size() + " out of " + dnMap.size() +
+                   " nodes are not done." : "");
+                 
+    LOG.info("Generation Stamp Upgrade is " + (isUpgradeDone() ? 
+             "complete. " : "still running. ") + spacing + msg);
+    return msg;
+  }
+  
+  private synchronized void setStatus(UpgradeStatus status) {
+    upgradeStatus = status;
+  }
+
+  /* Checks if upgrade completed based on datanode's status and/or 
+   * if all the blocks are upgraded.
+   */
+  private synchronized UpgradeStatus checkOverallCompletion() {
+    
+    if (upgradeStatus == UpgradeStatus.COMPLETED ||
+        upgradeStatus == UpgradeStatus.DATANODES_DONE) {
+      return upgradeStatus;
+    }
+    
+    if (upgradeStatus != UpgradeStatus.DATANODES_DONE) {
+      boolean datanodesDone =
+        (dnMap.size() > 0 && unfinishedDnMap.size() == 0 &&
+         ( System.currentTimeMillis() - lastNodeCompletionTime.get() ) > 
+        inactivityExtension) || forceDnCompletion ;
+                 
+      if ( datanodesDone ) {
+        LOG.info("Upgrade of DataNode blocks is complete. " +
+                 ((forceDnCompletion) ? "(ForceDnCompletion is on.)" : ""));
+        upgradeStatus = UpgradeStatus.DATANODES_DONE;
+      }
+    }
+    
+    if (upgradeStatus != UpgradeStatus.DATANODES_DONE &&
+        latestBlockLevelStats.updatedAt > 0) {
+      // check if last block report marked all
+      if (latestBlockLevelStats.minimallyReplicatedBlocks == 0 &&
+          latestBlockLevelStats.underReplicatedBlocks == 0) {
+        
+        LOG.info("Marking datanode upgrade complete since all the blocks are " +
+                 "upgraded (even though some datanodes may not have " +
+                 "reported completion. Block level stats :\n\t" +
+                 latestBlockLevelStats.statusString("\n\t"));
+        upgradeStatus = UpgradeStatus.DATANODES_DONE;
+      }
+    }
+    
+    return upgradeStatus;
+  } 
+    
+  /**
+   * This class monitors the upgrade progress and periodically prints 
+   * status message to log.
+   */
+  class UpgradeMonitor implements Runnable {
+    
+    static final long statusReportIntervalMillis = 1*60*1000;
+    static final long blockReportIntervalMillis = 5*60*1000;
+    static final int sleepTimeSec = 5;
+    
+    public void run() {
+      long lastReportTime = System.currentTimeMillis();
+      long lastBlockReportTime = lastReportTime;
+      
+      while ( !isUpgradeDone() ) {
+        UpgradeStatus status = checkOverallCompletion();
+        
+        if (status == UpgradeStatus.DATANODES_DONE) {
+          setStatus(UpgradeStatus.COMPLETED);
+        }
+        
+        long now = System.currentTimeMillis();
+        
+        
+        if (now-lastBlockReportTime >= blockReportIntervalMillis) {
+          updateBlockLevelStats();
+          // Check if all the blocks have been upgraded.
+          lastBlockReportTime = now;
+        }
+        
+        if ((now - lastReportTime) >= statusReportIntervalMillis || 
+            isUpgradeDone()) {
+          printStatus("\n\t");
+          lastReportTime = now;
+        }
+
+        if (isUpgradeDone()) {
+          break;
+        }
+        
+        try {
+          Thread.sleep(sleepTimeSec*1000);
+        } catch (InterruptedException e) {
+          break;
+        }
+      }
+      LOG.info("Leaving the Generation Stamp Upgrade Namenode monitor thread");
+    }
+  }
+  
+  private BlockLevelStats latestBlockLevelStats = new BlockLevelStats();
+  // internal class to hold the stats.
+  private static class BlockLevelStats {
+    long fullyReplicatedBlocks = 0;
+    long minimallyReplicatedBlocks = 0;
+    long underReplicatedBlocks = 0; // includes unReplicatedBlocks
+    long unReplicatedBlocks = 0; // zero replicas upgraded
+    long errors;
+    long updatedAt;
+    
+    String statusString(String spacing) {
+      long totalBlocks = fullyReplicatedBlocks + 
+                         minimallyReplicatedBlocks +
+                         underReplicatedBlocks;
+      double multiplier = 100/(totalBlocks + 1e-20);
+      
+      if (spacing.equals("")) {
+        spacing = ", ";
+      }
+      
+      return String.format(
+                     "Total Blocks : %d" +
+                     "%sFully Upgragraded : %.2f%%" +
+                     "%sMinimally Upgraded : %.2f%%" +
+                     "%sUnder Upgraded : %.2f%% (includes Un-upgraded blocks)" +
+                     "%sUn-upgraded : %.2f%%" + 
+                     "%sErrors : %d", totalBlocks, 
+                     spacing, (fullyReplicatedBlocks * multiplier),
+                     spacing, (minimallyReplicatedBlocks * multiplier),
+                     spacing, (underReplicatedBlocks * multiplier),
+                     spacing, (unReplicatedBlocks * multiplier),
+                     spacing, errors);
+    }
+  }
+  
+  void updateBlockLevelStats(String path, BlockLevelStats stats) {
+    DFSFileInfo[] fileArr = getFSNamesystem().dir.getListing(path);
+    
+    for (DFSFileInfo file:fileArr) {
+      if (file.isDir()) {
+        updateBlockLevelStats(file.getPath().toString(), stats);
+      } else {
+        // Get the all the blocks.
+        LocatedBlocks blockLoc = null;
+        try {
+          blockLoc = getFSNamesystem().getBlockLocations(
+              file.getPath().toString(), 0, file.getLen());
+          int numBlocks = blockLoc.locatedBlockCount();
+          for (int i=0; i<numBlocks; i++) {
+            LocatedBlock loc = blockLoc.get(i);
+            DatanodeInfo[] dnArr = loc.getLocations();
+            int numUpgraded = 0;
+            synchronized (this) {
+              for (DatanodeInfo dn:dnArr) {
+                DnInfo dnInfo = dnMap.get(dn);
+                if (dnInfo != null && dnInfo.isDone()) {
+                  numUpgraded++;
+                }
+              }
+            }
+            
+            if (numUpgraded >= file.getReplication()) {
+              stats.fullyReplicatedBlocks++;
+            } else if (numUpgraded >= getFSNamesystem().getMinReplication()) {
+              stats.minimallyReplicatedBlocks++;
+            } else {
+              stats.underReplicatedBlocks++;
+            }
+            if (numUpgraded == 0) {
+              stats.unReplicatedBlocks++;
+            }
+          }
+        } catch (IOException e) {
+          LOG.error("BlockGenerationStampUpgrade: could not get block locations for " +
+                    file.getPath().toString() + " : " +
+                    StringUtils.stringifyException(e));
+          stats.errors++;
+        }
+      }
+    }
+  }
+  
+  void updateBlockLevelStats() {
+    /* This iterates over all the blocks and updates various 
+     * counts.
+     * Since iterating over all the blocks at once would be quite 
+     * large operation under lock, we iterate over all the files
+     * and update the counts for blocks that belong to a file.
+     */
+      
+    LOG.info("Starting update of block level stats. " +
+             "This could take a few minutes");
+    BlockLevelStats stats = new BlockLevelStats();
+    updateBlockLevelStats("/", stats);
+    stats.updatedAt = System.currentTimeMillis();
+    
+    LOG.info("Block level stats:\n\t" + stats.statusString("\n\t"));
+    synchronized (this) {
+      latestBlockLevelStats = stats;
+    }
+  }
+}
+
+/**
+ * The Datanode sends this statistics object to the Namenode periodically.
+ */
+class DatanodeStatsCommand extends UpgradeCommand {
+  DatanodeID datanodeId;
+  int blocksUpgraded;
+  int blocksRemaining;
+  int errors;
+
+  DatanodeStatsCommand() {
+    super(GenerationStampUpgradeNamenode.DN_CMD_STATS, 0, (short)0);
+    datanodeId = new DatanodeID();
+  }
+
+  public DatanodeStatsCommand(short status, DatanodeID dn,
+                              int blocksUpgraded, int blocksRemaining,
+                              int errors, int version) {
+    super(GenerationStampUpgradeNamenode.DN_CMD_STATS, version, status);
+    //copy so that only ID part gets serialized
+    datanodeId = new DatanodeID(dn); 
+    this.blocksUpgraded = blocksUpgraded;
+    this.blocksRemaining = blocksRemaining;
+    this.errors = errors;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    datanodeId.readFields(in);
+    blocksUpgraded = in.readInt();
+    blocksRemaining = in.readInt();
+    errors = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    datanodeId.write(out);
+    out.writeInt(blocksUpgraded);
+    out.writeInt(blocksRemaining);
+    out.writeInt(errors);
+  }
+}
+
+
+/**
+ * A status report object for Generation Stamp Upgrades
+ */
+class GenerationStampUpgradeStatusReport extends UpgradeStatusReport {
+
+  String extraText = "";
+
+  public GenerationStampUpgradeStatusReport() {
+  }
+
+  public GenerationStampUpgradeStatusReport(int version, short status,
+                                            String extraText) {
+    super(version, status, false);
+    this.extraText = extraText;
+  }
+
+  @Override
+  public String getStatusText(boolean details) {
+    return super.getStatusText(details) + "\n\n" + extraText;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    extraText = Text.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Text.writeString(out, extraText);
+  }
+}
+

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java?rev=656122&r1=656121&r2=656122&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java Tue May 13 23:59:46 2008
@@ -109,7 +109,8 @@
     s.close();
     return chosenNode;
   }
-  public void streamBlockInAscii(InetSocketAddress addr, long blockId, long blockSize, 
+  public void streamBlockInAscii(InetSocketAddress addr, long blockId, 
+                                 long genStamp, long blockSize, 
                                  long offsetIntoBlock, long chunkSizeToView, JspWriter out) 
     throws IOException {
     if (chunkSizeToView == 0) return;
@@ -122,7 +123,7 @@
       // Use the block name for file name. 
       DFSClient.BlockReader blockReader = 
         DFSClient.BlockReader.newBlockReader(s, addr.toString() + ":" + blockId,
-                                             blockId, offsetIntoBlock, 
+                                             blockId, genStamp ,offsetIntoBlock, 
                                              amtToRead, 
                                              conf.getInt("io.file.buffer.size",
                                                          4096));