You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/17 00:57:36 UTC

svn commit: r496897 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/s3/ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Tue Jan 16 15:57:34 2007
New Revision: 496897

URL: http://svn.apache.org/viewvc?view=rev&rev=496897
Log:
HADOOP-855.  In HDFS, try to repair files with checksum errors.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 16 15:57:34 2007
@@ -28,6 +28,10 @@
  7. HADOOP-801.  Add to jobtracker a log of task completion events.
     (Sanjay Dahiya via cutting)
 
+ 8. HADOOP-855.  In HDFS, try to repair files with checksum errors.
+    An exception is still thrown, but corrupt blocks are now removed
+    when they have replicas.  (Wendy Chien via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue Jan 16 15:57:34 2007
@@ -29,7 +29,7 @@
  **********************************************************************/
 interface ClientProtocol extends VersionedProtocol {
 
-  public static final long versionID = 5L; // open() takes a new parameter
+    public static final long versionID = 6L; // reportBadBlocks added
   
     ///////////////////////////////////////
     // File contents
@@ -142,6 +142,13 @@
      * times before succeeding.
      */
     public boolean complete(String src, String clientName) throws IOException;
+
+    /**
+     * The client wants to report corrupted blocks (blocks with specified
+     * locations on datanodes).
+     * @param blocks Array of located blocks to report
+     */
+    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
 
     ///////////////////////////////////////
     // Namespace management

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Jan 16 15:57:34 2007
@@ -179,7 +179,14 @@
         }
       }
     }
-    
+
+    /**
+     * Report corrupt blocks that were discovered by the client.
+     */
+    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+      namenode.reportBadBlocks(blocks);
+    }
+  
     public short getDefaultReplication() {
       return defaultReplication;
     }
@@ -496,10 +503,12 @@
         private DataInputStream blockStream;
         private Block blocks[] = null;
         private DatanodeInfo nodes[][] = null;
+        private DatanodeInfo currentNode = null;
+        private Block currentBlock = null;
         private long pos = 0;
         private long filelen = 0;
         private long blockEnd = -1;
-
+        
         /**
          */
         public DFSInputStream(String src) throws IOException {
@@ -538,8 +547,24 @@
             }
             this.blocks = newBlocks;
             this.nodes = (DatanodeInfo[][]) nodeV.toArray(new DatanodeInfo[nodeV.size()][]);
+            this.currentNode = null;
+        }
+
+        /**
+         * Returns the datanode from which the stream is currently reading.
+         */
+        public DatanodeInfo getCurrentDatanode() {
+          return currentNode;
+        }
+
+        /**
+         * Returns the block containing the target position. 
+         */
+        public Block getCurrentBlock() {
+          return currentBlock;
         }
 
+
         /**
          * Used by the automatic tests to detemine blocks locations of a
          * file
@@ -623,6 +648,7 @@
 
                     this.pos = target;
                     this.blockEnd = targetBlockEnd;
+                    this.currentBlock = blocks[targetBlock];
                     this.blockStream = in;
                     return chosenNode;
                 } catch (IOException ex) {
@@ -671,7 +697,7 @@
             int result = -1;
             if (pos < filelen) {
                 if (pos > blockEnd) {
-                    blockSeekTo(pos, new TreeSet());
+                   currentNode = blockSeekTo(pos, new TreeSet());
                 }
                 result = blockStream.read();
                 if (result >= 0) {
@@ -691,7 +717,6 @@
             }
             if (pos < filelen) {
               int retries = 2;
-              DatanodeInfo chosenNode = null;
               TreeSet deadNodes = null;
               while (retries > 0) {
                 try {
@@ -699,7 +724,7 @@
                       if (deadNodes == null) {
                         deadNodes = new TreeSet();
                       }
-                      chosenNode = blockSeekTo(pos, deadNodes);
+                      currentNode = blockSeekTo(pos, deadNodes);
                   }
                   int realLen = Math.min(len, (int) (blockEnd - pos + 1));
                   int result = blockStream.read(buf, off, realLen);
@@ -711,7 +736,7 @@
                   LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
                   blockEnd = -1;
                   if (deadNodes == null) { deadNodes = new TreeSet(); }
-                  if (chosenNode != null) { deadNodes.add(chosenNode); }
+                  if (currentNode != null) { deadNodes.add(currentNode); }
                   if (--retries == 0) {
                     throw e;
                   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue Jan 16 15:57:34 2007
@@ -24,7 +24,7 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.*;
 
 /****************************************************************
  * Implementation of the abstract FileSystem for the DFS system.
@@ -248,15 +248,51 @@
         return dfs;
     }
     
-    public void reportChecksumFailure(Path f, FSInputStream in,
-                                      long start, long length, int crc) {
+
+    /**
+     * We need to find the blocks that didn't match.  Likely only one 
+     * is corrupt but we will report both to the namenode.  In the future,
+     * we can consider figuring out exactly which block is corrupt.
+     */
+    public void reportChecksumFailure(Path f, 
+                                      FSInputStream in, long inPos, 
+                                      FSInputStream sums, long sumsPos) {
       
-      // ignore for now, causing task to fail, and hope that when task is
-      // retried it gets a different copy of the block that is not corrupt.
+      LocatedBlock lblocks[] = new LocatedBlock[2];
+
+      try {
+        // Find block in data stream.
+        DFSClient.DFSInputStream dfsIn = (DFSClient.DFSInputStream) in;
+        Block dataBlock = dfsIn.getCurrentBlock();
+        if (dataBlock == null) {
+          throw new IOException("Error: Current block in data stream is null! ");
+        }
+        DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()}; 
+        lblocks[0] = new LocatedBlock(dataBlock, dataNode);
+        LOG.info("Found checksum error in data stream at block=" + dataBlock.getBlockName() + 
+                 " on datanode=" + dataNode[0].getName());
+
+        // Find block in checksum stream
+        DFSClient.DFSInputStream dfsSums = (DFSClient.DFSInputStream) sums;
+        Block sumsBlock = dfsSums.getCurrentBlock();
+        if (sumsBlock == null) {
+          throw new IOException("Error: Current block in checksum stream is null! ");
+        }
+        DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()}; 
+        lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
+        LOG.info("Found checksum error in checksum stream at block=" + sumsBlock.getBlockName() + 
+                 " on datanode=" + sumsNode[0].getName());
+
+        // Ask client to delete blocks.
+        dfs.reportBadBlocks(lblocks);
+
+      } catch (IOException ie) {
+        LOG.info("Found corruption while reading "
+                 + f.toString() 
+                 + ".  Error repairing corrupt blocks.  Bad blocks remain. " 
+                 + StringUtils.stringifyException(ie));
+      }
 
-      // FIXME: we should move the bad block(s) involved to a bad block
-      // directory on their datanode, and then re-replicate the blocks, so that
-      // no data is lost. a task may fail, but on retry it should succeed.
     }
 
     /** Return the total raw capacity of the filesystem, disregarding

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Jan 16 15:57:34 2007
@@ -766,6 +766,52 @@
         return true;
     }
 
+    /**
+     * Adds block to list of blocks which will be invalidated on 
+     * specified datanode.
+     */
+    private void addToInvalidates(Block b, DatanodeInfo n) {
+      Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
+      if (invalidateSet == null) {
+        invalidateSet = new ArrayList<Block>();
+        recentInvalidateSets.put(n.getStorageID(), invalidateSet);
+      }
+      invalidateSet.add(b);
+    }
+
+    /**
+     * Invalidates the given block on the given datanode.
+     */
+    public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
+        throws IOException {
+      NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " 
+                                    + blk.getBlockName() + " on " 
+                                    + dn.getName());
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot invalidate block " + blk.getBlockName(), safeMode);
+      }
+
+      List<DatanodeDescriptor> containingNodes = blocksMap.get(blk);
+
+      // Check how many copies we have of the block.  If we have at least one
+      // copy on a live node, then we can delete it. 
+      if (containingNodes != null ) {
+        if ((countContainingNodes(containingNodes) > 1) || 
+            ((countContainingNodes(containingNodes) == 1) &&
+             (dn.isDecommissionInProgress() || dn.isDecommissioned()))) {
+          addToInvalidates(blk, dn);
+          removeStoredBlock(blk, getDatanode(dn));
+          NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+                                        + blk.getBlockName() + " on " 
+                                        + dn.getName() + " listed for deletion.");
+        } else {
+          NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+                                        + blk.getBlockName() + " on " 
+                                        + dn.getName() + " is the only copy and was not deleted.");
+        }
+      }
+    }
+
     ////////////////////////////////////////////////////////////////
     // Here's how to handle block-copy failure during client write:
     // -- As usual, the client's write should result in a streaming
@@ -807,12 +853,7 @@
                 if (containingNodes != null) {
                     for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); ) {
                         DatanodeDescriptor node = it.next();
-                        Collection<Block> invalidateSet = recentInvalidateSets.get(node.getStorageID());
-                        if (invalidateSet == null) {
-                            invalidateSet = new ArrayList<Block>();
-                            recentInvalidateSets.put(node.getStorageID(), invalidateSet);
-                        }
-                        invalidateSet.add(b);
+                        addToInvalidates(b, node);
                         NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
                             + b.getBlockName() + " is added to invalidSet of " + node.getName() );
                     }
@@ -1732,7 +1773,7 @@
         // be-replicated list.
         //
         FSDirectory.INode fileINode = dir.getFileByBlock(block);
-        if( fileINode != null && (containingNodes.size() < fileINode.getReplication())) {
+        if( fileINode != null && (countContainingNodes(containingNodes) < fileINode.getReplication())) {
             synchronized (neededReplications) {
                 neededReplications.add(block);
             }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Jan 16 15:57:34 2007
@@ -314,6 +314,25 @@
             throw new IOException("Could not complete write to file " + src + " by " + clientName);
         }
     }
+
+    /**
+     * The client has detected an error on the specified located blocks 
+     * and is reporting them to the server.  For now, the namenode will 
+     * delete the blocks from the datanodes.  In the future we might 
+     * check the blocks are actually corrupt. 
+     */
+    public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+      stateChangeLog.debug("*DIR* NameNode.reportBadBlocks");
+      for (int i = 0; i < blocks.length; i++) {
+        Block blk = blocks[i].getBlock();
+        DatanodeInfo[] nodes = blocks[i].getLocations();
+        for (int j = 0; j < nodes.length; j++) {
+          DatanodeInfo dn = nodes[j];
+          namesystem.invalidateBlock(blk, dn);
+        }
+      }
+    }
+
     /**
      */
     public String[][] getHints(String src, long start, long len) throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java Tue Jan 16 15:57:34 2007
@@ -46,6 +46,7 @@
     private FSDataInputStream sums;
     private Checksum sum = new CRC32();
     private int inSum;
+    private FSInputStream sumsIn;
 
     public Checker(FileSystem fs, Path file, Configuration conf)
       throws IOException {
@@ -55,7 +56,8 @@
       this.file = file;
       Path sumFile = FileSystem.getChecksumFile(file);
       try {
-        this.sums = new FSDataInputStream(fs.openRaw(sumFile), conf);
+        sumsIn = fs.openRaw(sumFile);
+        this.sums = new FSDataInputStream(sumsIn, conf);
         byte[] version = new byte[VERSION.length];
         sums.readFully(version);
         if (!Arrays.equals(version, VERSION))
@@ -134,7 +136,7 @@
       if (crc != sumValue) {
         long pos = getPos() - delta;
         fs.reportChecksumFailure(file, (FSInputStream)in,
-                                 pos, bytesPerSum, crc);
+                                 pos, sumsIn, pos/bytesPerSum) ;
         throw new ChecksumException("Checksum error: "+file+" at "+pos);
       }
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Tue Jan 16 15:57:34 2007
@@ -879,13 +879,13 @@
      * Report a checksum error to the file system.
      * @param f the file name containing the error
      * @param in the stream open on the file
-     * @param start the position of the beginning of the bad data in the file
-     * @param length the length of the bad data in the file
-     * @param crc the expected CRC32 of the data
+     * @param inPos the position of the beginning of the bad data in the file
+     * @param sums the stream open on the checksum file
+     * @param sumsPos the position of the beginning of the bad data in the checksum file
      */
-    public abstract void reportChecksumFailure(Path f, FSInputStream in,
-                                               long start, long length,
-                                               int crc);
+    public abstract void reportChecksumFailure(Path f, 
+                                               FSInputStream in, long inPos, 
+                                               FSInputStream sums, long sumsPos);
 
     /**
      * Get the size for a particular file.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Tue Jan 16 15:57:34 2007
@@ -367,7 +367,8 @@
     /** Moves files to a bad file directory on the same device, so that their
      * storage will not be reused. */
     public void reportChecksumFailure(Path p, FSInputStream in,
-                                      long start, long length, int crc) {
+                                      long inPos,
+                                      FSInputStream sums, long sumsPos) {
       try {
         // canonicalize f   
         File f = pathToFile(p).getCanonicalFile();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Tue Jan 16 15:57:34 2007
@@ -289,8 +289,9 @@
   }
 
   @Override
-  public void reportChecksumFailure(Path path, FSInputStream in,
-      long start, long length, int crc) {
+  public void reportChecksumFailure(Path f, 
+                                    FSInputStream in, long inPos, 
+                                    FSInputStream sums, long sumsPos) {
     // TODO: What to do here?
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=diff&rev=496897&r1=496896&r2=496897
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Tue Jan 16 15:57:34 2007
@@ -388,9 +388,11 @@
  }
 
   @Override
-  public void reportChecksumFailure(
-      Path f, FSInputStream in, long start, long length, int crc) {
-    baseFS.reportChecksumFailure(f, in, start, length, crc); 
+
+  public void reportChecksumFailure(Path f, 
+                                    FSInputStream in, long inPos, 
+                                    FSInputStream sums, long sumsPos) {
+    baseFS.reportChecksumFailure(f, in, inPos, sums, sumsPos); 
   }
 
   @Override