You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/09/20 00:12:50 UTC

svn commit: r577456 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/test/org/apache/hadoop/dfs/

Author: dhruba
Date: Wed Sep 19 15:12:49 2007
New Revision: 577456

URL: http://svn.apache.org/viewvc?rev=577456&view=rev
Log:
HADOOP-89.  A client can access file data even before the creator
has closed the file. Introduce a new dfs shell command "tail".
(Dhruba Borthakur via dhruba)


Removed:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileUnderConstruction.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingCreates.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Sep 19 15:12:49 2007
@@ -35,6 +35,10 @@
 
   NEW FEATURES
 
+    HADOOP-89.  A client can access file data even before the creator
+    has closed the file. Introduce a new command "tail" from dfs shell.
+    (Dhruba Borthakur via dhruba)
+
     HADOOP-1636.  Allow configuration of the number of jobs kept in
     memory by the JobTracker.  (Michael Bieniosek via omalley)
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Wed Sep 19 15:12:49 2007
@@ -25,6 +25,7 @@
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
 
 /*************************************************
  * FSDirectory stores the filesystem directory state.
@@ -114,31 +115,43 @@
   /**
    * Add the given filename to the fs.
    */
-  public boolean addFile(String path, Block[] blocks, short replication,
-                         long preferredBlockSize) {
+  INode addFile(String path, 
+                short replication,
+                long preferredBlockSize,
+                String clientName,
+                String clientMachine,
+                DatanodeDescriptor clientNode) 
+                throws IOException {
     waitForReady();
 
     // Always do an implicit mkdirs for parent directory tree.
     long modTime = FSNamesystem.now();
     if (!mkdirs(new Path(path).getParent().toString(), modTime)) {
-      return false;
+      return null;
+    }
+    INodeFile newNode = new INodeFileUnderConstruction(replication, 
+                                 preferredBlockSize, modTime, clientName, 
+                                 clientMachine, clientNode);
+    synchronized (rootDir) {
+      try {
+        newNode = rootDir.addNode(path, newNode);
+      } catch (FileNotFoundException e) {
+        newNode = null;
+      }
     }
-    INodeFile newNode = (INodeFile)unprotectedAddFile(path, blocks, replication,
-                                                      modTime, 
-                                                      preferredBlockSize);
     if (newNode == null) {
       NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
-                                   +"failed to add "+path+" with "
-                                   +blocks.length+" blocks to the file system");
-      return false;
+                                   +"failed to add "+path
+                                   +" to the file system");
+      return null;
     }
     // add create file record to log
     fsImage.getEditLog().logCreateFile(path, newNode);
     NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
-                                  +path+" with "+blocks.length+" blocks is added to the file system");
-    return true;
+                                  +path+" is added to the file system");
+    return newNode;
   }
-    
+
   /**
    */
   INode unprotectedAddFile( String path, 
@@ -171,28 +184,64 @@
   }
 
   /**
-   * Add blocks to the file.
+   * Add a block to the file. Returns a reference to the added block.
+   */
+  Block addBlock(String path, INode file, Block block) throws IOException {
+    waitForReady();
+
+    synchronized (rootDir) {
+      INodeFile fileNode = (INodeFile) file;
+
+      // associate the new list of blocks with this file
+      namesystem.blocksMap.addINode(block, fileNode);
+      BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
+      fileNode.addBlock(blockInfo);
+
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+                                    + path + " with " + block
+                                    + " block is added to the in-memory "
+                                    + "file system");
+    }
+    return block;
+  }
+
+  /**
+   * Persist the block list for the inode.
+   */
+  void persistBlocks(String path, INode file) throws IOException {
+    waitForReady();
+
+    synchronized (rootDir) {
+      INodeFile fileNode = (INodeFile) file;
+
+      // create two transactions. The first one deletes the empty
+      // file and the second transaction recreates the same file
+      // with the appropriate set of blocks.
+      fsImage.getEditLog().logDelete(path, fileNode.getModificationTime());
+
+      // re-add create file record to log
+      fsImage.getEditLog().logCreateFile(path, fileNode);
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+                                    +path+" with "+ fileNode.getBlocks().length 
+                                    +" blocks is persisted to the file system");
+    }
+  }
+
+  /**
+   * Remove a block to the file.
    */
-  boolean addBlocks(String path, Block[] blocks) throws IOException {
+  boolean removeBlock(String path, INode file, Block block) throws IOException {
     waitForReady();
 
     synchronized (rootDir) {
-      INodeFile fileNode = this.getFileINode(path);
+      INodeFile fileNode = (INodeFile) file;
       if (fileNode == null) {
         throw new IOException("Unknown file: " + path);
       }
-      if (fileNode.getBlocks() != null &&
-          fileNode.getBlocks().length != 0) {
-        throw new IOException("Cannot add new blocks to " +
-                              "already existing file.");
-      }
 
-      // associate the new list of blocks with this file
-      fileNode.allocateBlocks(blocks.length);
-      for (int i = 0; i < blocks.length; i++) {
-        fileNode.setBlock(i, 
-            namesystem.blocksMap.addINode(blocks[i], fileNode));
-      }
+      // modify file-> block and blocksMap
+      fileNode.removeBlock(block);
+      namesystem.blocksMap.removeINode(block);
 
       // create two transactions. The first one deletes the empty
       // file and the second transaction recreates the same file
@@ -202,8 +251,8 @@
       // re-add create file record to log
       fsImage.getEditLog().logCreateFile(path, fileNode);
       NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
-                                    +path+" with "+blocks.length
-                                    +" blocks is added to the file system");
+                                    +path+" with "+block
+                                    +" block is added to the file system");
     }
     return true;
   }
@@ -372,6 +421,28 @@
           }
           return v.toArray(new Block[v.size()]);
         }
+      }
+    }
+  }
+
+  /**
+   * Replaces the specified inode with the specified one.
+   */
+  void replaceNode(String path, INodeFile oldnode, INodeFile newnode) 
+                      throws IOException {
+    synchronized (rootDir) {
+      //
+      // Remove the node from the namespace 
+      //
+      if (!oldnode.removeNode()) {
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.replaceNode: " +
+                                     "failed to remove " + path);
+        throw new IOException("FSDirectory.replaceNode: " +
+                              "failed to remove " + path);
+      } 
+      rootDir.addNode(path, newnode); 
+      for (Block b : newnode.getBlocks()) {
+        namesystem.blocksMap.addINode(b, newnode);
       }
     }
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Sep 19 15:12:49 2007
@@ -104,12 +104,6 @@
     new TreeMap<String, Collection<Block>>();
 
   //
-  // Keeps track of files that are being created, plus the
-  // blocks that make them up.
-  //
-  PendingCreates pendingCreates = new PendingCreates();
-
-  //
   // Stats on overall usage
   //
   long totalCapacity = 0L, totalUsed=0L, totalRemaining = 0L;
@@ -719,7 +713,15 @@
       throw new IOException( 
                             text + " is less than the required minimum " + minReplication);
   }
-    
+
+  void startFile(String src, String holder, String clientMachine, 
+                 boolean overwrite, short replication, long blockSize
+                ) throws IOException {
+    startFileInternal(src, holder, clientMachine, overwrite,
+                      replication, blockSize);
+    getEditLog().logSync();
+  }
+
   /**
    * The client would like to create a new block for the indicated
    * filename.  Return an array that consists of the block, plus a set 
@@ -731,7 +733,7 @@
    * @throws IOException if the filename is invalid
    *         {@link FSDirectory#isValidToCreate(String)}.
    */
-  synchronized void startFile(String src, 
+  synchronized void startFileInternal(String src, 
                                               String holder, 
                                               String clientMachine, 
                                               boolean overwrite,
@@ -746,10 +748,11 @@
       throw new IOException("Invalid file name: " + src);      	  
     }
     try {
-      FileUnderConstruction pendingFile = pendingCreates.get(src);
-      if (pendingFile != null) {
+      INode myFile = dir.getFileINode(src);
+      if (myFile != null && (myFile instanceof INodeFileUnderConstruction)) {
+        INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
         //
-        // If the file exists in pendingCreate, then it must be in our
+        // If the file is under construction , then it must be in our
         // leases. Find the appropriate lease record.
         //
         Lease lease = getLease(holder);
@@ -814,15 +817,6 @@
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine);
 
-      // Reserve space for this pending file
-      pendingCreates.put(src, 
-                         new FileUnderConstruction(replication, 
-                                                   blockSize,
-                                                   holder,
-                                                   clientMachine, 
-                                                   clientNode));
-      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
-                                    +"add "+src+" to pendingCreates for "+holder);
       synchronized (leases) {
         Lease lease = getLease(holder);
         if (lease == null) {
@@ -836,20 +830,27 @@
         }
         lease.startedCreate(src);
       }
+
+      //
+      // Now we can add the name to the filesystem. This file has no
+      // blocks associated with it.
+      //
+      INode newNode = dir.addFile(src, replication, blockSize,
+                                  holder, 
+                                  clientMachine, 
+                                  clientNode);
+      if (newNode == null) {
+        throw new IOException("DIR* NameSystem.startFile: " +
+                              "Unable to add file to namespace.");
+      }
     } catch (IOException ie) {
       NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
                                    +ie.getMessage());
       throw ie;
     }
 
-    //
-    // Now we can add the name to the filesystem. This file has no
-    // blocks associated with it.
-    //
-    if (!dir.addFile(src, new Block[0], replication, blockSize)) {
-      throw new IOException("DIR* NameSystem.startFile: " +
-                            "Unable to add file to namespace.");
-    }
+    NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+                                  +"add "+src+" to namespace for "+holder);
   }
 
   /**
@@ -882,7 +883,7 @@
       //
       // make sure that we still have the lease on this file
       //
-      FileUnderConstruction pendingFile = pendingCreates.get(src);
+      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src);
       if (pendingFile == null) {
         throw new LeaseExpiredException("No lease on " + src);
       }
@@ -898,11 +899,11 @@
       if (!checkFileProgress(pendingFile, false)) {
         throw new NotReplicatedYetException("Not replicated yet:" + src);
       }
-      fileLength = pendingFile.computeFileLength();
-      blockSize = pendingFile.getBlockSize();
+      fileLength = pendingFile.computeContentsLength();
+      blockSize = pendingFile.getPreferredBlockSize();
       clientNode = pendingFile.getClientNode();
       replication = (int)pendingFile.getReplication();
-      newBlock = allocateBlock(src);
+      newBlock = allocateBlock(src, pendingFile);
     }
 
     DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
@@ -928,13 +929,14 @@
     //
     NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                   +b.getBlockName()+"of file "+src);
-    boolean status = pendingCreates.removeBlock(src, b);
-    if (status) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+    INode file = dir.getFileINode(src);
+    if (file != null) {
+      dir.removeBlock(src, file, b);
+    }
+    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                     + b.getBlockName()
                                     + " is removed from pendingCreates");
-    }
-    return status;
+    return true;
   }
 
   /**
@@ -964,8 +966,7 @@
   }
 
   /**
-   * Finalize the created file and make it world-accessible.  The
-   * FSNamesystem will already know the blocks that make up the file.
+   * The FSNamesystem will already know the blocks that make up the file.
    * Before we return, we make sure that all the file's blocks have 
    * been reported by datanodes and are replicated correctly.
    */
@@ -980,11 +981,11 @@
     NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
     if (isInSafeMode())
       throw new SafeModeException("Cannot complete file " + src, safeMode);
-    FileUnderConstruction pendingFile = pendingCreates.get(src);
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src);
 
     Block[] fileBlocks =  dir.getFileBlocks(src);
-    if ((fileBlocks != null && fileBlocks.length > 0) ||
-         pendingFile == null) {    
+    if (fileBlocks == null || fileBlocks.length == 0 ||
+        pendingFile == null) {    
       NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
                                    + "failed to complete " + src
                                    + " because dir.getFileBlocks() is " + 
@@ -999,36 +1000,16 @@
       return STILL_WAITING;
     }
         
-    Collection<Block> blocks = pendingFile.getBlocks();
-    int nrBlocks = blocks.size();
-    Block pendingBlocks[] = new Block[nrBlocks];
-
-    //
-    // We have the pending blocks, but they won't have
-    // length info in them (as they were allocated before
-    // data-write took place). Find the block stored in
-    // node descriptor.
-    //
-    int idx = 0;
-    for (Block b : blocks) {
-      Block storedBlock = blocksMap.getStoredBlock(b);
-      // according to checkFileProgress() every block is present & replicated
-      assert storedBlock != null : "Missing block " + b.getBlockName();
-      pendingBlocks[idx++] = storedBlock;
-    }
-        
-    //
-    // add blocks to the file
-    //
-    if (!dir.addBlocks(src, pendingBlocks)) {
-      return OPERATION_FAILED;
-    }
+    // The file is no longer pending.
+    // Create permanent INode, update blockmap
+    INodeFile newFile = pendingFile.convertToInodeFile();
+    dir.replaceNode(src, pendingFile, newFile);
 
-    // The file is no longer pending
-    pendingCreates.remove(src);
-    NameNode.stateChangeLog.debug(
-                                  "DIR* NameSystem.completeFile: " + src
-                                  + " is removed from pendingCreates");
+    // persist block allocations for this file
+    dir.persistBlocks(src, newFile);
+
+    NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+                                  + " blocklist persisted");
 
     synchronized (leases) {
       Lease lease = getLease(holder);
@@ -1051,6 +1032,8 @@
     // Now that the file is real, we need to be sure to replicate
     // the blocks.
     int numExpectedReplicas = pendingFile.getReplication();
+    Block[] pendingBlocks = pendingFile.getBlocks();
+    int nrBlocks = pendingBlocks.length;
     for (int i = 0; i < nrBlocks; i++) {
       // filter out containingNodes that are marked for decommission.
       NumberReplicas number = countNodes(pendingBlocks[i]);
@@ -1069,15 +1052,14 @@
   /**
    * Allocate a block at the given pending filename
    */
-  private Block allocateBlock(String src) throws IOException {
+  private Block allocateBlock(String src, INode file) throws IOException {
     Block b = null;
     do {
       b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
     } while (isValidBlock(b));
-    pendingCreates.addBlock(src, b);
+    b = dir.addBlock(src, file, b);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
-                                 +src+ ". "+b.getBlockName()+
-                                 " is created and added to pendingCreates and pendingCreateBlocks");      
+                                 +src+ ". "+b.getBlockName());
     return b;
   }
 
@@ -1086,13 +1068,13 @@
    * replicated.  If not, return false. If checkall is true, then check
    * all blocks, otherwise check only penultimate block.
    */
-  synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) {
+  synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
     if (checkall) {
       //
       // check all blocks of the file.
       //
-      for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext();) {
-        if (blocksMap.numNodes(it.next()) < this.minReplication) {
+      for (Block block: v.getBlocks()) {
+        if (blocksMap.numNodes(block) < this.minReplication) {
           return false;
         }
       }
@@ -1490,21 +1472,36 @@
   }
 
   /**
-   * Release a pending file creation lock.
+   * Move a file that is being written to be immutable.
    * @param src The filename
    * @param holder The datanode that was creating the file
    */
   private void internalReleaseCreate(String src, String holder) throws IOException {
-    boolean status =  pendingCreates.remove(src);
-    if (status) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " + src
-                                    + " is removed from pendingCreates for "
-                                    + holder + " (failure)");
-    } else {
-      NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
-                                   + "attempt to release a create lock on "+ src
-                                   + " that was not in pedingCreates");
+    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src);
+
+    // The last block that was allocated migth not have been used by the
+    // client. In this case, the size of the last block would be 0. A fsck
+    // will report this block as a missing block because no datanodes have it.
+    // Delete this block.
+    Block[] blocks = pendingFile.getBlocks();
+    if (blocks != null && blocks.length > 1) {
+      Block last = blocks[blocks.length - 1];
+      if (last.getNumBytes() == 0) {
+          pendingFile.removeBlock(last);
+      }
     }
+
+    // The file is no longer pending.
+    // Create permanent INode, update blockmap
+    INodeFile newFile = pendingFile.convertToInodeFile();
+    dir.replaceNode(src, pendingFile, newFile);
+
+    // persist block allocations for this file
+    dir.persistBlocks(src, newFile);
+  
+    NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " + 
+                                  src + " is no longer written to by " + 
+                                  holder);
   }
 
   /**
@@ -2161,7 +2158,18 @@
     Block storedBlock = blocksMap.getStoredBlock(block); //extra look up!
     if (storedBlock != null && block != storedBlock) {
       if (block.getNumBytes() > 0) {
-        storedBlock.setNumBytes(block.getNumBytes());
+        long cursize = storedBlock.getNumBytes();
+        if (cursize == 0) {
+          storedBlock.setNumBytes(block.getNumBytes());
+        } else if (cursize != block.getNumBytes()) {
+          LOG.warn("Inconsistent size for block " + block + 
+                   " reported from " + node.getName() + 
+                   " current size is " + cursize +
+                   " reported size is " + block.getNumBytes());
+          // Accept this block even if there is a problem with its
+          // size. Clients should detect data corruption because of
+          // CRC mismatch.
+        }
       }
       block = storedBlock;
     }
@@ -2185,8 +2193,13 @@
                                    + block.getBlockName() + " on " + node.getName());
     }
 
-    if (fileINode == null)  // block does not belong to any file
+    //
+    // if file is being actively written to, then do not check 
+    // replication-factor here. It will be checked when the file is closed.
+    //
+    if (fileINode == null || fileINode instanceof INodeFileUnderConstruction) {
       return block;
+    }
         
     // filter out containingNodes that are marked for decommission.
     NumberReplicas num = countNodes(block);
@@ -3460,8 +3473,7 @@
    * Returns whether the given block is one pointed-to by a file.
    */
   private boolean isValidBlock(Block b) {
-    return (blocksMap.getINode(b) != null ||
-            pendingCreates.contains(b));
+    return (blocksMap.getINode(b) != null);
   }
 
   // Distributed upgrade manager

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java Wed Sep 19 15:12:49 2007
@@ -23,6 +23,7 @@
 import java.util.Collections;
 import java.util.Arrays;
 import java.util.List;
+import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
@@ -472,6 +473,14 @@
     allocateBlocks(nrBlocks);
   }
 
+  protected INodeFile(BlockInfo[] blklist, short replication, long modificationTime,
+                      long preferredBlockSize) {
+    super(modificationTime);
+    this.blockReplication = replication;
+    this.preferredBlockSize = preferredBlockSize;
+    blocks = blklist;
+  }
+
   boolean isDirectory() {
     return false;
   }
@@ -492,7 +501,7 @@
    * Get file blocks 
    * @return file blocks
    */
-  Block[] getBlocks() {
+  BlockInfo[] getBlocks() {
     return this.blocks;
   }
 
@@ -505,6 +514,45 @@
   }
 
   /**
+   * add a block to the block list
+   */
+  void addBlock(BlockInfo newblock) {
+    if (this.blocks == null) {
+      this.blocks = new BlockInfo[1];
+      this.blocks[0] = newblock;
+    } else {
+      int size = this.blocks.length;
+      BlockInfo[] newlist = new BlockInfo[size + 1];
+      for (int i = 0; i < size; i++) {
+        newlist[i] = this.blocks[i];
+      }
+      newlist[size] = newblock;
+      this.blocks = newlist;
+    }
+  }
+
+  /**
+   * remove a block from the block list. This block should be
+   * the last one on the list.
+   */
+  void removeBlock(Block oldblock) throws IOException {
+    if (this.blocks == null) {
+      throw new IOException("Trying to delete non-existant block " +
+                            oldblock);
+    }
+    int size = this.blocks.length;
+    if (!this.blocks[size-1].equals(oldblock)) {
+      throw new IOException("Trying to delete non-existant block " +
+                            oldblock);
+    }
+    BlockInfo[] newlist = new BlockInfo[size - 1];
+    for (int i = 0; i < size-1; i++) {
+        newlist[i] = this.blocks[i];
+    }
+    this.blocks = newlist;
+  }
+
+  /**
    * Set file block
    */
   void setBlock(int idx, BlockInfo blk) {
@@ -536,5 +584,58 @@
    */
   long getPreferredBlockSize() {
     return preferredBlockSize;
+  }
+
+  /**
+   * Return the penultimate allocated block for this file.
+   */
+  Block getPenultimateBlock() {
+    if (blocks == null || blocks.length <= 1) {
+      return null;
+    }
+    return blocks[blocks.length - 2];
+  }
+}
+
+class INodeFileUnderConstruction extends INodeFile {
+  protected StringBytesWritable clientName;         // lease holder
+  protected StringBytesWritable clientMachine;
+  protected DatanodeDescriptor clientNode; // if client is a cluster node too.
+
+  INodeFileUnderConstruction(short replication,
+                             long preferredBlockSize,
+                             long modTime,
+                             String clientName,
+                             String clientMachine,
+                             DatanodeDescriptor clientNode) 
+                             throws IOException {
+    super(0, replication, modTime, preferredBlockSize);
+    this.clientName = new StringBytesWritable(clientName);
+    this.clientMachine = new StringBytesWritable(clientMachine);
+    this.clientNode = clientNode;
+  }
+
+  String getClientName() throws IOException {
+    return clientName.getString();
+  }
+
+  String getClientMachine() throws IOException {
+    return clientMachine.getString();
+  }
+
+  DatanodeDescriptor getClientNode() {
+    return clientNode;
+  }
+
+  //
+  // converts a INodeFileUnderConstruction into a INodeFile
+  //
+  INodeFile convertToInodeFile() {
+    INodeFile obj = new INodeFile(getBlocks(),
+                                  getReplication(),
+                                  getModificationTime(),
+                                  getPreferredBlockSize());
+    return obj;
+    
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Wed Sep 19 15:12:49 2007
@@ -44,6 +44,7 @@
     modifFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
   }
   static final String SETREP_SHORT_USAGE="-setrep [-R] [-w] <rep> <path/file>";
+  static final String TAIL_USAGE="-tail [-f] <file>";
   private static final DecimalFormat decimalFormat = 
     new DecimalFormat("#*0.0#*");
 
@@ -881,6 +882,54 @@
   }
 
   /**
+   * Parse the incoming command string
+   * @param cmd
+   * @param pos ignore anything before this pos in cmd
+   * @throws IOException 
+   */
+  private void tail(String[] cmd, int pos) throws IOException {
+    CommandFormat c = new CommandFormat("tail", 1, 1, "f");
+    String src = null;
+    Path path = null;
+    short rep = 0;
+
+    try {
+      List<String> parameters = c.parse(cmd, pos);
+      src = parameters.get(0);
+    } catch(IllegalArgumentException iae) {
+      System.err.println("Usage: java FsShell " + TAIL_USAGE);
+      throw iae;
+    }
+    boolean foption = c.options.get("f") ? true: false;
+    path = new Path(src);
+
+    if (fs.isDirectory(path)) {
+      throw new IOException("Source must be a file.");
+    }
+
+    long fileSize = fs.getFileStatus(path).getLen();
+    long offset = (fileSize > 1024) ? fileSize - 1024: 0;
+
+    while (true) {
+      FSDataInputStream in = fs.open(path);
+      in.seek(offset);
+      IOUtils.copyBytes(in, System.out, 1024, false);
+      offset = in.getPos();
+      in.close();
+      if (!foption) {
+        break;
+      }
+      fileSize = fs.getFileStatus(path).getLen();
+      offset = (fileSize > offset) ? offset: fileSize;
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+  }
+
+  /**
    * Return an abbreviated English-language desc of the byte length
    */
   public static String byteDesc(long len) {
@@ -926,6 +975,7 @@
       "[-copyToLocal <src><localdst>] [-moveToLocal <src> <localdst>]\n\t" +
       "[-mkdir <path>] [-report] [" + SETREP_SHORT_USAGE + "]\n\t" +
       "[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]\n\t" +
+      "[-tail [-f] <path>]\n\t" +
       "[-help [cmd]]\n";
 
     String conf ="-conf <configuration file>:  Specify an application configuration file.";
@@ -1025,6 +1075,10 @@
       "\t\tin the specified format. Format accepts filesize in blocks (%b), filename (%n),\n" +
       "\t\tblock size (%o), replication (%r), modification date (%y, %Y)\n";
 
+    String tail = TAIL_USAGE
+      + ":  Show the last 1KB of the file. \n"
+      + "\t\tThe -f option shows apended data as the file grows. \n";
+
     String help = "-help [cmd]: \tDisplays help for given command or all commands if none\n" +
       "\t\tis specified.\n";
 
@@ -1078,6 +1132,8 @@
       System.out.println(test);
     } else if ("stat".equals(cmd)) {
       System.out.println(stat);
+    } else if ("tail".equals(cmd)) {
+      System.out.println(tail);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -1214,6 +1270,8 @@
     } else if ("-stat".equals(cmd)) {
       System.err.println("Usage: java FsShell" +
                          " [-stat [format] <path>]");
+    } else if ("-tail".equals(cmd)) {
+      System.err.println("Usage: java FsShell [" + TAIL_USAGE + "]");
     } else {
       System.err.println("Usage: java FsShell");
       System.err.println("           [-ls <path>]");
@@ -1238,6 +1296,7 @@
       System.err.println("           [-touchz <path>]");
       System.err.println("           [-test -[ezd] <path>]");
       System.err.println("           [-stat [format] <path>]");
+      System.err.println("           [" + TAIL_USAGE + "]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -1370,6 +1429,8 @@
         } else {
           printHelp("");
         }
+      } else if ("-tail".equals(cmd)) {
+        tail(argv, i);           
       } else {
         exitCode = -1;
         System.err.println(cmd.substring(1) + ": Unknown command");

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java Wed Sep 19 15:12:49 2007
@@ -27,14 +27,16 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
 
 /**
- * This class tests the FileStatus API.
+ * This class tests that a file need not be closed before its
+ * data can be read by another client.
  */
 public class TestFileCreation extends TestCase {
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
-  static final int fileSize = 16384;
+  static final int fileSize = 2 * blockSize;
 
   private static String TEST_ROOT_DIR =
     new Path(System.getProperty("test.build.data","/tmp"))
@@ -78,11 +80,27 @@
         }
       }
     }
+    FSDataInputStream stm = fileSys.open(name);
+    byte[] expected = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(expected);
+    // do a sanity check. Read the file
+    byte[] actual = new byte[fileSize];
+    stm.readFully(0, actual);
+    checkData(actual, 0, expected, "Read 1");
   }
 
+  private void checkData(byte[] actual, int from, byte[] expected, String message) {
+    for (int idx = 0; idx < actual.length; idx++) {
+      this.assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+                        expected[from+idx]+" actual "+actual[idx],
+                        actual[idx], expected[from+idx]);
+      actual[idx] = 0;
+    }
+  }
 
   /**
-   * Tests various options of File creation.
+   * Test that file data becomes available before file is closed.
    */
   public void testFileCreation() throws IOException {
     Configuration conf = new Configuration();
@@ -115,9 +133,13 @@
       // write to file
       writeFile(stm);
 
-      // close file. This makes all file data visible to clients.
-      stm.close();
+      // verify that file size has changed
+      assertTrue(file1 + " should be of size " + fileSize, 
+                  fs.getFileStatus(file1).getLen() == fileSize);
+
+      // Make sure a client can read it before it is closed.
       checkFile(fs, file1, 1);
+      stm.close();
 
     } finally {
       fs.close();