You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2011/09/02 21:59:52 UTC

svn commit: r1164680 - in /hadoop/common/branches/branch-0.20-security: ./ src/core/org/apache/hadoop/io/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/ha...

Author: suresh
Date: Fri Sep  2 19:59:52 2011
New Revision: 1164680

URL: http://svn.apache.org/viewvc?rev=1164680&view=rev
Log:
Porting from 0.20-append branch - HDFS-200. Support append and sync for hadoop 0.20 branch. Contributed by Dhruba Borthakur.


Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep  2 19:59:52 2011
@@ -7,6 +7,8 @@ Release 0.20.205.0 - unreleased
     HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of
     datanodes without restarting.  (Eric Payne via szetszwo)
 
+    HDFS-200. Support append and sync for hadoop 0.20 branch. (dhruba)
+
   BUG FIXES
 
     MAPREDUCE-2324. Removed usage of broken

Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java Fri Sep  2 19:59:52 2011
@@ -938,6 +938,13 @@ public class SequenceFile {
       }
     }
 
+    /** flush all currently written data to the file system */
+    public void syncFs() throws IOException {
+      if (out != null) {
+        out.sync();                               // flush contents to file system
+      }
+    }
+
     /** Returns the configuration of this file. */
     Configuration getConf() { return conf; }
     

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Sep  2 19:59:52 2011
@@ -1604,7 +1604,10 @@ public class DFSClient implements FSCons
         throw new FileNotFoundException("File does not exist: " + src);
       }
 
-      if (locatedBlocks != null) {
+      // I think this check is not correct. A file could have been appended to
+      // between two calls to openInfo().
+      if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
+          !newInfo.isUnderConstruction()) {
         Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
         Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
         while (oldIter.hasNext() && newIter.hasNext()) {
@@ -1613,6 +1616,39 @@ public class DFSClient implements FSCons
           }
         }
       }
+
+      // if the file is under construction, then fetch size of last block
+      // from datanode.
+      if (newInfo.isUnderConstruction() && newInfo.locatedBlockCount() > 0) {
+        LocatedBlock last = newInfo.get(newInfo.locatedBlockCount()-1);
+        boolean lastBlockInFile = (last.getStartOffset() + 
+                                   last.getBlockSize() == 
+                                   newInfo.getFileLength()); 
+        if (lastBlockInFile && last.getLocations().length > 0) {
+          ClientDatanodeProtocol primary =  null;
+          DatanodeInfo primaryNode = last.getLocations()[0];
+          try {
+            primary = createClientDatanodeProtocolProxy(primaryNode, conf,
+                last.getBlock(), last.getBlockToken(), socketTimeout);
+            Block newBlock = primary.getBlockInfo(last.getBlock());
+            long newBlockSize = newBlock.getNumBytes();
+            long delta = newBlockSize - last.getBlockSize();
+            // if the size of the block on the datanode is different
+            // from what the NN knows about, the datanode wins!
+            last.getBlock().setNumBytes(newBlockSize);
+            long newlength = newInfo.getFileLength() + delta;
+            newInfo.setFileLength(newlength);
+            LOG.debug("DFSClient setting last block " + last + 
+                      " to length " + newBlockSize +
+                      " filesize is now " + newInfo.getFileLength());
+          } catch (IOException e) {
+            LOG.debug("DFSClient file " + src + 
+                      " is being concurrently append to" +
+                      " but datanode " + primaryNode.getHostName() +
+                      " probably does not have block " + last.getBlock());
+          }
+        }
+      }
       this.locatedBlocks = newInfo;
       this.currentNode = null;
     }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Sep  2 19:59:52 2011
@@ -32,9 +32,9 @@ public interface ClientDatanodeProtocol 
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 4: never return null and always return a newly generated access token
+   * 5: added getBlockInfo
    */
-  public static final long versionID = 4L;
+  public static final long versionID = 5L;
 
   /** Start generation-stamp recovery for specified block
    * @param block the specified block
@@ -47,4 +47,12 @@ public interface ClientDatanodeProtocol 
    */
   LocatedBlock recoverBlock(Block block, boolean keepLength,
       DatanodeInfo[] targets) throws IOException;
+
+  /** Returns a block object that contains the specified block object
+   * from the specified Datanode.
+   * @param block the specified block
+   * @return the Block object from the specified Datanode
+   * @throws IOException if the block does not exist
+   */
+  Block getBlockInfo(Block block) throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Fri Sep  2 19:59:52 2011
@@ -85,6 +85,13 @@ public class LocatedBlocks implements Wr
   public boolean isUnderConstruction() {
     return underConstruction;
   }
+
+  /**
+   * Sets the file length of the file.
+   */
+  public void setFileLength(long length) {
+    this.fileLength = length;
+  }
   
   /**
    * Find block containing specified offset.

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Sep  2 19:59:52 2011
@@ -97,7 +97,7 @@ class BlockReceiver implements java.io.C
       // Open local disk out
       //
       streams = datanode.data.writeToBlock(block, isRecovery);
-      this.finalized = datanode.data.isValidBlock(block);
+      this.finalized = false;
       if (streams != null) {
         this.out = streams.dataOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Sep  2 19:59:52 2011
@@ -1841,6 +1841,12 @@ public class DataNode extends Configured
     return recoverBlock(block, keepLength, targets, false);
   }
 
+  /** {@inheritDoc} */
+  public Block getBlockInfo(Block block) throws IOException {
+    Block stored = data.getStoredBlock(block.getBlockId());
+    return stored;
+  }
+
   private static void logRecoverBlock(String who,
       Block block, DatanodeID[] targets) {
     StringBuilder msg = new StringBuilder(targets[0].getName());

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Sep  2 19:59:52 2011
@@ -1136,12 +1136,12 @@ public class FSDataset implements FSCons
         v = volumes.getNextVolume(blockSize);
         // create temporary file to hold block in the designated volume
         f = createTmpFile(v, b);
-        volumeMap.put(b, new DatanodeBlockInfo(v));
+        volumeMap.put(b, new DatanodeBlockInfo(v, f));
       } else if (f != null) {
         DataNode.LOG.info("Reopen already-open Block for append " + b);
         // create or reuse temporary file to hold block in the designated volume
         v = volumeMap.get(b).getVolume();
-        volumeMap.put(b, new DatanodeBlockInfo(v));
+        volumeMap.put(b, new DatanodeBlockInfo(v, f));
       } else {
         // reopening block for appending to it.
         DataNode.LOG.info("Reopen Block for append " + b);
@@ -1172,7 +1172,7 @@ public class FSDataset implements FSCons
                                   " to tmp dir " + f);
           }
         }
-        volumeMap.put(b, new DatanodeBlockInfo(v));
+        volumeMap.put(b, new DatanodeBlockInfo(v, f));
       }
       if (f == null) {
         DataNode.LOG.warn("Block " + b + " reopen failed " +

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Sep  2 19:59:52 2011
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -391,11 +392,28 @@ public class DatanodeDescriptor extends 
     // Note we are taking special precaution to limit tmp blocks allocated
     // as part this block report - which why block list is stored as longs
     Block iblk = new Block(); // a fixed new'ed block to be reused with index i
+    Block oblk = new Block(); // for fixing genstamps
     for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
       iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), 
                newReport.getBlockGenStamp(i));
       BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
       if(storedBlock == null) {
+        // if the block with a WILDCARD generation stamp matches 
+        // then accept this block.
+        // This block has a diferent generation stamp on the datanode 
+        // because of a lease-recovery-attempt.
+        oblk.set(newReport.getBlockId(i), newReport.getBlockLen(i),
+                 GenerationStamp.WILDCARD_STAMP);
+        storedBlock = blocksMap.getStoredBlock(oblk);
+        if (storedBlock != null && storedBlock.getINode() != null &&
+            (storedBlock.getGenerationStamp() <= iblk.getGenerationStamp() ||
+             storedBlock.getINode().isUnderConstruction())) {
+          // accept block. It wil be cleaned up on cluster restart.
+        } else {
+          storedBlock = null;
+        }
+      }
+      if(storedBlock == null) {
         // If block is not in blocksMap it does not belong to any file
         toInvalidate.add(new Block(iblk));
         continue;

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep  2 19:59:52 2011
@@ -1992,11 +1992,30 @@ public class FSNamesystem implements FSC
   }
 
   /**
+   * This is invoked when a lease expires. On lease expiry, 
+   * all the files that were written from that dfsclient should be
+   * recovered.
+   */
+  void internalReleaseLease(Lease lease, String src) throws IOException {
+    if (lease.hasPath()) {
+      // make a copy of the paths because internalReleaseLeaseOne removes
+      // pathnames from the lease record.
+      String[] leasePaths = new String[lease.getPaths().size()];
+      lease.getPaths().toArray(leasePaths);
+      for (String p: leasePaths) {
+        internalReleaseLeaseOne(lease, p);
+      }
+    } else {
+      internalReleaseLeaseOne(lease, src);
+    }
+  }
+
+  /**
    * Move a file that is being written to be immutable.
    * @param src The filename
    * @param lease The lease for the client creating the file
    */
-  void internalReleaseLease(Lease lease, String src) throws IOException {
+  void internalReleaseLeaseOne(Lease lease, String src) throws IOException {
     LOG.info("Recovering lease=" + lease + ", src=" + src);
 
     INodeFile iFile = dir.getFileINode(src);
@@ -2104,20 +2123,11 @@ public class FSNamesystem implements FSC
         descriptors = new DatanodeDescriptor[newtargets.length];
         for(int i = 0; i < newtargets.length; i++) {
           descriptors[i] = getDatanode(newtargets[i]);
-        }
-      }
-      if (closeFile) {
-        // the file is getting closed. Insert block locations into blocksMap.
-        // Otherwise fsck will report these blocks as MISSING, especially if the
-        // blocksReceived from Datanodes take a long time to arrive.
-        for (int i = 0; i < descriptors.length; i++) {
           descriptors[i].addBlock(newblockinfo);
         }
-        pendingFile.setLastBlock(newblockinfo, null);
-      } else {
-        // add locations into the INodeUnderConstruction
-        pendingFile.setLastBlock(newblockinfo, descriptors);
       }
+      // add locations into the INodeUnderConstruction
+      pendingFile.setLastBlock(newblockinfo, descriptors);
     }
 
     // If this commit does not want to close the file, persist
@@ -2546,9 +2556,11 @@ public class FSNamesystem implements FSC
           LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
           break;
         } catch (IOException ie) {
-          LOG.warn("ReplicationMonitor thread received exception. " + ie);
+          LOG.warn("ReplicationMonitor thread received exception. " + ie +  " " +
+                   StringUtils.stringifyException(ie));
         } catch (Throwable t) {
-          LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
+          LOG.warn("ReplicationMonitor thread received Runtime exception. " + t + " " +
+                   StringUtils.stringifyException(t));
           Runtime.getRuntime().exit(-1);
         }
       }
@@ -3231,6 +3243,24 @@ public class FSNamesystem implements FSC
                                     DatanodeDescriptor node,
                                     DatanodeDescriptor delNodeHint) {
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    if (storedBlock == null) {
+      // if the block with a WILDCARD generation stamp matches and the
+      // corresponding file is under construction, then accept this block.
+      // This block has a diferent generation stamp on the datanode 
+      // because of a lease-recovery-attempt.
+      Block nblk = new Block(block.getBlockId());
+      storedBlock = blocksMap.getStoredBlock(nblk);
+      if (storedBlock != null && storedBlock.getINode() != null &&
+          (storedBlock.getGenerationStamp() <= block.getGenerationStamp() ||
+           storedBlock.getINode().isUnderConstruction())) {
+        NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+          + "addStoredBlock request received for " + block + " on "
+          + node.getName() + " size " + block.getNumBytes()
+          + " and it belongs to a file under construction. ");
+      } else {
+        storedBlock = null;
+      }
+    }
     if(storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
@@ -3251,6 +3281,8 @@ public class FSNamesystem implements FSC
     if (block != storedBlock) {
       if (block.getNumBytes() >= 0) {
         long cursize = storedBlock.getNumBytes();
+        INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
+        boolean underConstruction = (file == null ? false : file.isUnderConstruction());
         if (cursize == 0) {
           storedBlock.setNumBytes(block.getNumBytes());
         } else if (cursize != block.getNumBytes()) {
@@ -3262,9 +3294,11 @@ public class FSNamesystem implements FSC
             if (cursize > block.getNumBytes()) {
               // new replica is smaller in size than existing block.
               // Mark the new replica as corrupt.
-              LOG.warn("Mark new replica " + block + " from " + node.getName() + 
-                  "as corrupt because its length is shorter than existing ones");
-              markBlockAsCorrupt(block, node);
+              if (!underConstruction) {
+                LOG.warn("Mark new replica " + block + " from " + node.getName() + 
+                    "as corrupt because its length is shorter than existing ones");
+                markBlockAsCorrupt(block, node);
+              }
             } else {
               // new replica is larger in size than existing block.
               // Mark pre-existing replicas as corrupt.
@@ -3278,7 +3312,7 @@ public class FSNamesystem implements FSC
                   nodes[count++] = dd;
                 }
               }
-              for (int j = 0; j < count; j++) {
+              for (int j = 0; j < count && !underConstruction; j++) {
                 LOG.warn("Mark existing replica " + block + " from " + node.getName() + 
                 " as corrupt because its length is shorter than the new one");
                 markBlockAsCorrupt(block, nodes[j]);
@@ -3301,7 +3335,6 @@ public class FSNamesystem implements FSC
         }
         
         //Updated space consumed if required.
-        INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
         long diff = (file == null) ? 0 :
                     (file.getPreferredBlockSize() - storedBlock.getNumBytes());