You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2016/01/05 20:52:10 UTC

[10/50] [abbrv] hadoop git commit: [partial-ns] Implement getAdditionalBlock().

[partial-ns] Implement getAdditionalBlock().


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a36f5bc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a36f5bc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a36f5bc

Branch: refs/heads/feature-HDFS-8286
Commit: 6a36f5bcc4edfba4e9779b0fd19923179138b551
Parents: e9c9c72b
Author: Haohui Mai <wh...@apache.org>
Authored: Thu May 14 17:02:18 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Jun 12 13:56:57 2015 -0700

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    | 109 +++++---
 .../hdfs/server/blockmanagement/BlocksMap.java  |   8 +-
 .../hdfs/server/namenode/FSDirWriteFileOp.java  | 247 ++++++++++---------
 .../hadoop/hdfs/server/namenode/FSEditLog.java  |  11 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  73 +++++-
 .../hdfs/server/namenode/RWTransaction.java     |   4 +
 .../hadoop/hdfs/server/namenode/Resolver.java   |   4 +
 7 files changed, 298 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6139e37..9b18f45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.FlatINodeFileFeature;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
@@ -638,17 +640,42 @@ public class BlockManager {
   }
 
   /**
-   * Convert a specified block of the file to a complete block.
-   * @param bc file
-   * @param blkIndex  block index in the file
-   * @throws IOException if the block does not have at least a minimal number
-   * of replicas reported from data-nodes.
+   * Commit or complete the last block. Return the length of the last block
    */
+  public long commitOrCompleteLastBlock(FlatINodeFileFeature file, Block
+      commitBlock) throws IOException {
+    if(commitBlock == null)
+      return 0; // not committing, this is a block allocation retry
+    BlockInfoContiguous lastBlock = getStoredBlock(file.lastBlock());
+    if(lastBlock == null)
+      return 0; // no blocks in file yet
+    if(lastBlock.isComplete())
+      return lastBlock.getNumBytes(); // already completed (e.g. by syncBlock)
+
+    commitBlock((BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
+    if(countNodes(lastBlock).liveReplicas() >= minReplication) {
+      return completeBlock(lastBlock, false).getNumBytes();
+    }
+    return lastBlock.getNumBytes();
+  }
+
   private BlockInfoContiguous completeBlock(final BlockCollection bc,
       final int blkIndex, boolean force) throws IOException {
     if(blkIndex < 0)
       return null;
-    BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex];
+    BlockInfoContiguous block = completeBlock(bc.getBlocks()[blkIndex], force);
+    // replace penultimate block in file
+    bc.setBlock(blkIndex, block);
+    return block;
+  }
+
+  /**
+   * Convert a specified block of the file to a complete block.
+   * @throws IOException if the block does not have at least a minimal number
+   * of replicas reported from data-nodes.
+   */
+  private BlockInfoContiguous completeBlock(BlockInfoContiguous curBlock,
+      boolean force) throws IOException {
     if(curBlock.isComplete())
       return curBlock;
     BlockInfoContiguousUnderConstruction ucBlock =
@@ -661,9 +688,7 @@ public class BlockManager {
       throw new IOException(
           "Cannot complete block: block has not been COMMITTED by the client");
     BlockInfoContiguous completeBlock = ucBlock.convertToCompleteBlock();
-    // replace penultimate block in file
-    bc.setBlock(blkIndex, completeBlock);
-    
+
     // Since safe-mode only counts complete blocks, and we now have
     // one more complete block, we need to adjust the total up, and
     // also count it as safe, if we have at least the minimum replica
@@ -2800,10 +2825,10 @@ public class BlockManager {
       processOverReplicatedBlock(block, expectedReplication, null, null);
       return MisReplicationResult.OVER_REPLICATED;
     }
-    
+
     return MisReplicationResult.OK;
   }
-  
+
   /** Set replication for the blocks. */
   public void setReplication(final short oldRepl, final short newRepl,
       final String src, final Block b) {
@@ -2812,7 +2837,7 @@ public class BlockManager {
     }
 
     // update needReplication priority queues
-    updateNeededReplications(b, 0, newRepl-oldRepl);
+    updateNeededReplications(b, 0, newRepl - oldRepl);
 
     if (oldRepl > newRepl) {
       // old replication > the new one; need to remove copies
@@ -2845,8 +2870,7 @@ public class BlockManager {
       if (storage.areBlockContentsStale()) {
         LOG.info("BLOCK* processOverReplicatedBlock: " +
             "Postponing processing of over-replicated " +
-            block + " since storage + " + storage
-            + "datanode " + cur + " does not yet have up-to-date " +
+                     block + " since storage + " + storage + "datanode " + cur + " does not yet have up-to-date " +
             "block information.");
         postponeBlock(block);
         return;
@@ -2862,8 +2886,8 @@ public class BlockManager {
         }
       }
     }
-    chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint, blockplacement);
+    chooseExcessReplicates(nonExcess, block, replication, addedNode,
+                           delNodeHint, blockplacement);
   }
 
 
@@ -2969,15 +2993,17 @@ public class BlockManager {
 
   private void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
-    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
+    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(
+        dn.getDatanodeUuid());
     if (excessBlocks == null) {
       excessBlocks = new LightWeightLinkedSet<Block>();
       excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
     }
     if (excessBlocks.add(block)) {
       excessBlocksCount.incrementAndGet();
-      blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to"
-          + " excessReplicateMap", dn, block);
+      blockLog.debug(
+          "BLOCK* addToExcessReplicate: ({}, {}) is added to" + " excessReplicateMap",
+          dn, block);
     }
   }
 
@@ -2985,8 +3011,7 @@ public class BlockManager {
       DatanodeDescriptor node) {
     if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block)) {
-      queueReportedBlock(storageInfo, block, null,
-          QUEUE_REASON_FUTURE_GENSTAMP);
+      queueReportedBlock(storageInfo, block, null, QUEUE_REASON_FUTURE_GENSTAMP);
       return;
     }
     removeStoredBlock(block, node);
@@ -3093,7 +3118,7 @@ public class BlockManager {
     processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
   }
-  
+
   private void processAndHandleReportedBlock(
       DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
@@ -3400,21 +3425,44 @@ public class BlockManager {
     }
   }
 
+  public boolean checkBlocksProperlyReplicated(String src,
+      final BlockInfoContiguous[] blocks) {
+    return checkBlocksProperlyReplicated(src, new Iterable<Block>() {
+      @Override
+      public Iterator<Block> iterator() {
+        return new Iterator<Block>() {
+          private int index;
+          @Override
+          public boolean hasNext() {
+            return index < blocks.length;
+          }
+
+          @Override
+          public Block next() {
+            return blocks[index++];
+          }
+        };
+      }
+    });
+  }
+
   /**
    * Check that the indicated blocks are present and
    * replicated.
    */
-  public boolean checkBlocksProperlyReplicated(
-      String src, BlockInfoContiguous[] blocks) {
-    for (BlockInfoContiguous b: blocks) {
+  public boolean checkBlocksProperlyReplicated(String src, Iterable<Block>
+      blocks) {
+    for (Block bid : blocks) {
+      BlockInfoContiguous b = bid instanceof BlockInfoContiguous
+          ? (BlockInfoContiguous) bid : getStoredBlock(bid);
       if (!b.isComplete()) {
         final BlockInfoContiguousUnderConstruction uc =
             (BlockInfoContiguousUnderConstruction)b;
         final int numNodes = b.numNodes();
         LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
-          + uc.getBlockUCState() + ", replication# = " + numNodes
-          + (numNodes < minReplication ? " < ": " >= ")
-          + " minimum = " + minReplication + ") in file " + src);
+            + uc.getBlockUCState() + ", replication# = " + numNodes
+            + (numNodes < minReplication ? " < ": " >= ")
+            + " minimum = " + minReplication + ") in file " + src);
         return false;
       }
     }
@@ -3527,6 +3575,11 @@ public class BlockManager {
     return blocksMap.addBlockCollection(block, bc);
   }
 
+  public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block,
+      long bcId) {
+    return blocksMap.addBlockCollection(block, bcId);
+  }
+
   public long getBlockCollectionId(Block b) {
     BlockInfoContiguous bi = getStoredBlock(b);
     return bi == null ? INodeId.INVALID_INODE_ID : bi.getBlockCollectionId();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 9a1dc29..e18e384 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -97,16 +97,20 @@ class BlocksMap {
     }
   }
 
+  BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) {
+    return addBlockCollection(b, bc.getId());
+  }
+
   /**
    * Add block b belonging to the specified block collection to the map.
    */
-  BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) {
+  BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, long bcId) {
     BlockInfoContiguous info = blocks.get(b);
     if (info != b) {
       info = b;
       blocks.put(info);
     }
-    info.setBlockCollectionId(bc.getId());
+    info.setBlockCollectionId(bcId);
     return info;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 33e31e7..a136b1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -173,34 +175,34 @@ class FSDirWriteFileOp {
     final byte storagePolicyID;
     String clientMachine;
 
-    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
-    src = fsn.dir.resolvePath(pc, src, pathComponents);
-    FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
-                                           previous, onRetryBlock);
-    final INodeFile pendingFile = fileState.inode;
-    // Check if the penultimate block is minimally replicated
-    if (!fsn.checkFileProgress(src, pendingFile, false)) {
-      throw new NotReplicatedYetException("Not replicated yet: " + src);
-    }
+    FSDirectory fsd = fsn.getFSDirectory();
+    try (ROTransaction tx = fsd.newROTransaction().begin()) {
+      FileState fileState = analyzeFileState(tx, fsn, src, fileId, clientName,
+                                             previous, onRetryBlock);
+      final FlatINode pendingFile = fileState.inode;
+      FlatINodeFileFeature f = pendingFile.feature(FlatINodeFileFeature.class);
+      // Check if the penultimate block is minimally replicated
+      if (!fsn.checkFileProgress(src, f, false)) {
+        throw new NotReplicatedYetException("Not replicated yet: " + src);
+      }
 
-    if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
-      // This is a retry. No need to generate new locations.
-      // Use the last block if it has locations.
-      return null;
-    }
-    if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
-      throw new IOException("File has reached the limit on maximum number of"
-          + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
-          + "): " + pendingFile.getBlocks().length + " >= "
-          + fsn.maxBlocksPerFile);
+      if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
+        // This is a retry. No need to generate new locations.
+        // Use the last block if it has locations.
+        return null;
+      }
+      if (f.numBlocks() >= fsn.maxBlocksPerFile) {
+        throw new IOException("File has reached the limit on maximum number of"
+            + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+            + "): " + f.numBlocks() + " >= " + fsn.maxBlocksPerFile);
+      }
+      blockSize = f.blockSize();
+      clientMachine = f.clientMachine();
+      replication = f.replication();
+      storagePolicyID = f.storagePolicyId();
+      return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
+                                      clientMachine);
     }
-    blockSize = pendingFile.getPreferredBlockSize();
-    clientMachine = pendingFile.getFileUnderConstructionFeature()
-        .getClientMachine();
-    replication = pendingFile.getFileReplication();
-    storagePolicyID = pendingFile.getStoragePolicyID();
-    return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
-                                    clientMachine);
   }
 
   static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
@@ -226,39 +228,50 @@ class FSDirWriteFileOp {
     // Run the full analysis again, since things could have changed
     // while chooseTarget() was executing.
     LocatedBlock[] onRetryBlock = new LocatedBlock[1];
-    FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
-                                           previous, onRetryBlock);
-    final INodeFile pendingFile = fileState.inode;
-    src = fileState.path;
-
-    if (onRetryBlock[0] != null) {
-      if (onRetryBlock[0].getLocations().length > 0) {
-        // This is a retry. Just return the last block if having locations.
-        return onRetryBlock[0];
-      } else {
-        // add new chosen targets to already allocated block and return
-        BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
-        ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
-            .setExpectedLocations(targets);
-        offset = pendingFile.computeFileSize();
-        return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
+    FSDirectory fsd = fsn.getFSDirectory();
+    BlockManager bm = fsn.getBlockManager();
+    try (RWTransaction tx = fsd.newRWTransaction().begin()) {
+      FileState fileState = analyzeFileState(tx, fsn, src, fileId, clientName,
+                                             previous, onRetryBlock);
+      final FlatINode inode = fileState.inode;
+      FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
+      src = fileState.path;
+
+      if (onRetryBlock[0] != null) {
+        if (onRetryBlock[0].getLocations().length > 0) {
+          // This is a retry. Just return the last block if having locations.
+          return onRetryBlock[0];
+        } else {
+          // add new chosen targets to already allocated block and return
+          Block lastBlock = file.lastBlock();
+          BlockInfoContiguous lastBlockInFile = bm.getStoredBlock(lastBlock);
+          ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
+              .setExpectedLocations(targets);
+          offset = file.fileSize();
+          return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
+        }
       }
-    }
 
-    // commit the last block and complete it if it has minimum replicas
-    fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
-                                  ExtendedBlock.getLocalBlock(previous));
+      // commit the last block and complete it if it has minimum replicas
+      FlatINodeFileFeature.Builder newFile = fsn.commitOrCompleteLastBlock(
+          file, ExtendedBlock.getLocalBlock(previous));
 
-    // allocate new block, record block locations in INode.
-    Block newBlock = fsn.createNewBlock();
-    INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
-    saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets);
+      // allocate new block, record block locations in INode.
+      Block newBlock = fsn.createNewBlock();
+      saveAllocatedBlock(fsn, src, inode, newBlock, targets);
+      FlatINode newInode = persistNewBlock(tx, src, inode, newFile, newBlock);
+      offset = newInode.<FlatINodeFileFeature>feature(
+          FlatINodeFileFeature.class).fileSize();
 
-    persistNewBlock(fsn, src, pendingFile);
-    offset = pendingFile.computeFileSize();
+      // TODO: Update quota
+      // check quota limits and updated space consumed
+//      fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+//                      fileINode.getFileReplication(), true);
 
-    // Return located block
-    return makeLocatedBlock(fsn, newBlock, targets, offset);
+      tx.commit();
+      // Return located block
+      return makeLocatedBlock(fsn, newBlock, targets, offset);
+    }
   }
 
   static DatanodeStorageInfo[] chooseTargetForNewBlock(
@@ -553,72 +566,56 @@ class FSDirWriteFileOp {
    * Add a block to the file. Returns a reference to the added block.
    */
   private static BlockInfoContiguous addBlock(
-      FSDirectory fsd, String path, INodesInPath inodesInPath, Block block,
+      BlockManager bm, String path, FlatINode inode, Block block,
       DatanodeStorageInfo[] targets) throws IOException {
-    fsd.writeLock();
-    try {
-      final INodeFile fileINode = inodesInPath.getLastINode().asFile();
-      Preconditions.checkState(fileINode.isUnderConstruction());
-
-      // check quota limits and updated space consumed
-      fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
-          fileINode.getFileReplication(), true);
-
-      // associate new last block for the file
-      BlockInfoContiguousUnderConstruction blockInfo =
+    FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class);
+    Preconditions.checkState(f.inConstruction());
+    // associate new last block for the file
+    BlockInfoContiguousUnderConstruction blockInfo =
         new BlockInfoContiguousUnderConstruction(
-            block,
-            fileINode.getFileReplication(),
+            block, f.replication(),
             HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
             targets);
-      fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
-      fileINode.addBlock(blockInfo);
 
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
-            + path + " with " + block
-            + " block is added to the in-memory "
-            + "file system");
-      }
-      return blockInfo;
-    } finally {
-      fsd.writeUnlock();
+    bm.addBlockCollection(blockInfo, inode.id());
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: " + path
+          + " with " + block + " block is added to the in-memory file system");
     }
+    return blockInfo;
   }
 
   private static FileState analyzeFileState(
-      FSNamesystem fsn, String src, long fileId, String clientName,
+      Transaction tx, FSNamesystem fsn, String src, long fileId, String clientName,
       ExtendedBlock previous, LocatedBlock[] onRetryBlock)
           throws IOException  {
     assert fsn.hasReadLock();
-
+    BlockManager bm = fsn.getBlockManager();
     checkBlock(fsn, previous);
     onRetryBlock[0] = null;
-    fsn.checkNameNodeSafeMode("Cannot add block to " + src);
-
-    // have we exceeded the configured limit of fs objects.
-    fsn.checkFsObjectLimit();
 
     Block previousBlock = ExtendedBlock.getLocalBlock(previous);
-    final INode inode;
-    final INodesInPath iip;
-    if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+    final Resolver.Result paths;
+    if (true || fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
       // Older clients may not have given us an inode ID to work with.
       // In this case, we have to try to resolve the path and hope it
       // hasn't changed or been deleted since the file was opened for write.
-      iip = fsn.dir.getINodesInPath4Write(src);
-      inode = iip.getLastINode();
+      paths = Resolver.resolve(tx, src);
     } else {
       // Newer clients pass the inode ID, so we can just get the inode
       // directly.
-      inode = fsn.dir.getInode(fileId);
-      iip = INodesInPath.fromINode(inode);
-      if (inode != null) {
-        src = iip.getPath();
-      }
+      paths = Resolver.resolveById(tx, fileId);
+    }
+    if (paths.invalidPath()) {
+      throw new InvalidPathException(src);
+    } else if (paths.notFound()) {
+      throw new FileNotFoundException(src);
     }
-    final INodeFile file = fsn.checkLease(src, clientName, inode, fileId);
-    BlockInfoContiguous lastBlockInFile = file.getLastBlock();
+    FlatINode inode = paths.inodesInPath().getLastINode();
+    fsn.checkLease(src, clientName, inode);
+    FlatINodeFileFeature pendingFile = inode.feature(FlatINodeFileFeature.class);
+    BlockInfoContiguous lastBlockInFile = pendingFile.lastBlock() == null ?
+        null : bm.getStoredBlock(pendingFile.lastBlock());
     if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
       // The block that the client claims is the current last block
       // doesn't match up with what we think is the last block. There are
@@ -646,10 +643,11 @@ class FSDirWriteFileOp {
       //    changed the namesystem state yet.
       //    We run this analysis again in Part II where case 4 is impossible.
 
-      BlockInfoContiguous penultimateBlock = file.getPenultimateBlock();
+      BlockInfoContiguous penultimateBlock = bm.getStoredBlock(
+          pendingFile.penultimateBlock());
       if (previous == null &&
           lastBlockInFile != null &&
-          lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() &&
+          lastBlockInFile.getNumBytes() >= pendingFile.blockSize() &&
           lastBlockInFile.isComplete()) {
         // Case 1
         if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -668,15 +666,15 @@ class FSDirWriteFileOp {
 
         // Case 2
         // Return the last block.
-        NameNode.stateChangeLog.info("BLOCK* allocateBlock: caught retry for " +
-            "allocation of a new block in " + src + ". Returning previously" +
-            " allocated block " + lastBlockInFile);
-        long offset = file.computeFileSize();
+        NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
+            "caught retry for allocation of a new block in " +
+            src + ". Returning previously allocated block " + lastBlockInFile);
+        long offset = pendingFile.fileSize();
         BlockInfoContiguousUnderConstruction lastBlockUC =
             (BlockInfoContiguousUnderConstruction) lastBlockInFile;
         onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
             lastBlockUC.getExpectedStorageLocations(), offset);
-        return new FileState(file, src, iip);
+        return new FileState(inode, src);
       } else {
         // Case 3
         throw new IOException("Cannot allocate block in " + src + ": " +
@@ -684,7 +682,7 @@ class FSDirWriteFileOp {
                                   "last block in file " + lastBlockInFile);
       }
     }
-    return new FileState(file, src, iip);
+    return new FileState(inode, src);
   }
 
   static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc,
@@ -779,15 +777,25 @@ class FSDirWriteFileOp {
   /**
    * Persist the new block (the last block of the given file).
    */
-  private static void persistNewBlock(
-      FSNamesystem fsn, String path, INodeFile file) {
-    Preconditions.checkArgument(file.isUnderConstruction());
-    fsn.getEditLog().logAddBlock(path, file);
+  private static FlatINode persistNewBlock(
+      RWTransaction tx, String path, FlatINode inode,
+      FlatINodeFileFeature.Builder newFile, Block newBlock) {
+    Preconditions.checkArgument(newFile.inConstruction());
+    newFile.addBlock(newBlock);
+    FlatINodeFileFeature newFeature = FlatINodeFileFeature.wrap(newFile.build());
+
+    FlatINode.Builder builder = new FlatINode.Builder()
+        .mergeFrom(inode).replaceFeature(newFeature);
+    ByteString newFileBytes = builder.build();
+    FlatINode newInode = FlatINode.wrap(newFileBytes);
+    tx.putINode(inode.id(), newFileBytes);
+    tx.logAddBlock(path, newFeature);
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("persistNewBlock: "
-              + path + " with new block " + file.getLastBlock().toString()
-              + ", current total block count is " + file.getBlocks().length);
+      NameNode.stateChangeLog.debug("persistNewBlock: " + path
+          + " with new block " + newBlock + ", current total block count is "
+          + newFeature.numBlocks());
     }
+    return newInode;
   }
 
   /**
@@ -795,19 +803,18 @@ class FSDirWriteFileOp {
    *
    * @param fsn FSNamesystem
    * @param src path to the file
-   * @param inodesInPath representing each of the components of src.
-   *                     The last INode is the INode for {@code src} file.
+   * @param inode the file
    * @param newBlock newly allocated block to be save
    * @param targets target datanodes where replicas of the new block is placed
    * @throws QuotaExceededException If addition of block exceeds space quota
    */
   private static void saveAllocatedBlock(
-      FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
+      FSNamesystem fsn, String src, FlatINode inode, Block newBlock,
       DatanodeStorageInfo[] targets)
       throws IOException {
     assert fsn.hasWriteLock();
-    BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock,
-                                     targets);
+    BlockManager bm = fsn.getBlockManager();
+    BlockInfoContiguous b = addBlock(bm, src, inode, newBlock, targets);
     NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
     DatanodeStorageInfo.incrementBlocksScheduled(targets);
   }
@@ -842,14 +849,12 @@ class FSDirWriteFileOp {
   }
 
   private static class FileState {
-    final INodeFile inode;
+    final FlatINode inode;
     final String path;
-    final INodesInPath iip;
 
-    FileState(INodeFile inode, String fullPath, INodesInPath iip) {
+    FileState(FlatINode inode, String fullPath) {
       this.inode = inode;
       this.path = fullPath;
-      this.iip = iip;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 370050d..739b2d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -772,12 +772,11 @@ public class FSEditLog implements LogsPurgeable {
     logEdit(op);
   }
 
-  public void logAddBlock(String path, INodeFile file) {
-    Preconditions.checkArgument(file.isUnderConstruction());
-    BlockInfoContiguous[] blocks = file.getBlocks();
-    Preconditions.checkState(blocks != null && blocks.length > 0);
-    BlockInfoContiguous pBlock = blocks.length > 1 ? blocks[blocks.length - 2] : null;
-    BlockInfoContiguous lastBlock = blocks[blocks.length - 1];
+  public void logAddBlock(String path, FlatINodeFileFeature file) {
+    Preconditions.checkArgument(file.inConstruction());
+    Preconditions.checkState(file.numBlocks() > 0);
+    Block pBlock = file.penultimateBlock();
+    Block lastBlock = file.lastBlock();
     AddBlockOp op = AddBlockOp.getInstance(cache.get()).setPath(path)
         .setPenultimateBlock(pBlock).setLastBlock(lastBlock);
     logEdit(op);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index ed28547..5c12b50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -2761,6 +2761,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     readLock();
     try {
       checkOperation(OperationCategory.READ);
+      checkNameNodeSafeMode("Cannot add block to " + src);
       r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
                                             previous, onRetryBlock);
     } finally {
@@ -2781,6 +2782,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     LocatedBlock lb;
     try {
       checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot add block to " + src);
       lb = FSDirWriteFileOp.storeAllocatedBlock(
           this, src, fileId, clientName, previous, targets);
     } finally {
@@ -2919,7 +2921,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
     return file;
   }
- 
+
+  void checkLease(String src, String holder, FlatINode inode)
+      throws LeaseExpiredException, FileNotFoundException {
+    assert hasReadLock();
+    final String ident = src;
+    if (inode == null) {
+      throw new FileNotFoundException(src);
+    } else if (!inode.isFile()) {
+      Lease lease = leaseManager.getLease(holder);
+      throw new LeaseExpiredException(
+          "No lease on " + ident + ": INode is not a regular file. "
+              + (lease != null ? lease.toString()
+              : "Holder " + holder + " does not have any open files."));
+    }
+    FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class);
+    if (!f.inConstruction()) {
+      Lease lease = leaseManager.getLease(holder);
+      throw new LeaseExpiredException(
+          "No lease on " + ident + ": File is not open for writing. "
+              + (lease != null ? lease.toString()
+              : "Holder " + holder + " does not have any open files."));
+    }
+    String clientName = f.clientName();
+    if (holder != null && !clientName.equals(holder)) {
+      throw new LeaseExpiredException("Lease mismatch on " + ident +
+          " owned by " + clientName + " but is accessed by " + holder);
+    }
+  }
+
   /**
    * Complete in-progress write to the given file.
    * @return true if successful, false if the client should continue to retry
@@ -2977,6 +3007,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   /**
+   * Check that the indicated file's blocks are present and
+   * replicated.  If not, return false. If checkall is true, then check
+   * all blocks, otherwise check only penultimate block.
+   */
+  boolean checkFileProgress(String src, FlatINodeFileFeature v,
+      boolean checkall) {
+    assert hasReadLock();
+    if (checkall) {
+      return blockManager.checkBlocksProperlyReplicated(src, v.blocks());
+    } else {
+      // check the penultimate block of this file
+      Block b = v.penultimateBlock();
+      return b == null ||
+          blockManager.checkBlocksProperlyReplicated(src, Lists.newArrayList
+              (b));
+    }
+  }
+
+  /**
    * Change the indicated filename. 
    * @deprecated Use {@link #renameTo(String, String, boolean,
    * Options.Rename...)} instead.
@@ -3537,6 +3586,28 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  FlatINodeFileFeature.Builder commitOrCompleteLastBlock(
+      FlatINodeFileFeature f, Block commitBlock)
+      throws IOException {
+    assert hasWriteLock();
+    Preconditions.checkArgument(f.inConstruction());
+    if (commitBlock == null) {
+      return new FlatINodeFileFeature.Builder().mergeFrom(f);
+    }
+
+    Preconditions.checkState(f.numBlocks() > 0);
+    long newBlockLength =
+        blockManager.commitOrCompleteLastBlock(f, commitBlock);
+    FlatINodeFileFeature.Builder b = new FlatINodeFileFeature.Builder()
+        .mergeFrom(f);
+    Block newBlock = new Block(commitBlock);
+    newBlock.setNumBytes(newBlockLength);
+    b.block(f.numBlocks() - 1, newBlock);
+    return b;
+    // TODO: Update quota
+    // Adjust disk space consumption if required
+  }
+
   void finalizeINodeFileUnderConstruction(
       String src, INodeFile pendingFile, int latestSnapshot) throws IOException {
     assert hasWriteLock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
index b97b11f..4db8fad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
@@ -159,4 +159,8 @@ class RWTransaction extends Transaction {
       boolean overwrite, boolean logRetryCache) {
     fsd.getEditLog().logOpenFile(ugid, src, inode, overwrite, logRetryCache);
   }
+
+  public void logAddBlock(String src, FlatINodeFileFeature file) {
+    fsd.getEditLog().logAddBlock(src, file);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java
index 5420084..06f9e90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java
@@ -127,6 +127,10 @@ class Resolver {
     return resolve(tx, path);
   }
 
+  public static Result resolveById(Transaction tx, long id) {
+    throw new IllegalArgumentException("Unimplemented");
+  }
+
 //  public static Result getInodeById(Transaction tx, long id)
 //    throws IOException {
 //    byte[] inodeKey = Encoding.inodeIdKey(id);