You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/26 22:49:39 UTC

svn commit: r500370 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/s3/

Author: cutting
Date: Fri Jan 26 13:49:38 2007
New Revision: 500370

URL: http://svn.apache.org/viewvc?view=rev&rev=500370
Log:
HADOOP-731.  When a checksum error is encountered on a file stored in HDFS, try to find another replica.  Contributed by Wendy.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 26 13:49:38 2007
@@ -63,6 +63,10 @@
 19. HADOOP-909.  Fix the 'du' command to correctly compute the size of
     FileSystem directory trees.  (Hairong Kuang via cutting)
 
+20. HADOOP-731.  When a checksum error is encountered on a file stored
+    in HDFS, try another replica of the data, if any.
+    (Wendy Chien via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Fri Jan 26 13:49:38 2007
@@ -618,7 +618,7 @@
                 DNAddrPair retval = chooseDataNode(targetBlock, deadNodes);
                 chosenNode = retval.info;
                 InetSocketAddress targetAddr = retval.addr;
-            
+
                 try {
                     s = new Socket();
                     s.connect(targetAddr, READ_TIMEOUT);
@@ -764,7 +764,7 @@
               if (nodes[blockId] == null || nodes[blockId].length == 0) {
                 LOG.info("No node available for block: " + blockInfo);
               }
-              LOG.info("Could not obtain block from any node:  " + ie);
+              LOG.info("Could not obtain block " + blockId + " from any node:  " + ie);
               try {
                 Thread.sleep(3000);
               } catch (InterruptedException iex) {
@@ -889,6 +889,24 @@
             blockEnd = -1;
         }
 
+        /**
+         * Seek to given position on a node other than the current node.  If
+         * a node other than the current node is found, then returns true. 
+         * If another node could not be found, then returns false.
+         */
+        public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+            TreeSet excludeNodes = new TreeSet();       
+            excludeNodes.add(currentNode);
+            String oldNodeID = currentNode.getStorageID();
+            DatanodeInfo newNode = blockSeekTo(targetPos, excludeNodes); 
+            if (!oldNodeID.equals(newNode.getStorageID())) {
+                currentNode = newNode;
+                return true;
+            } else {
+                return false;
+            }
+        }
+        
         /**
          */
         public synchronized long getPos() throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java Fri Jan 26 13:49:38 2007
@@ -90,34 +90,61 @@
     }
     
     public int read(byte b[], int off, int len) throws IOException {
-      int read = in.read(b, off, len);
+      int read;
+      boolean retry;
+      int retriesLeft = 3;
+      long oldPos = getPos();
+      do {
+        retriesLeft--;
+        retry = false;
 
-      if (sums != null) {
-        int summed = 0;
-        while (summed < read) {
-          
-          int goal = bytesPerSum - inSum;
-          int inBuf = read - summed;
-          int toSum = inBuf <= goal ? inBuf : goal;
-          
+        read = in.read(b, off, len);
+        
+        if (sums != null) {
+          long oldSumsPos = sums.getPos();
           try {
-            sum.update(b, off+summed, toSum);
-          } catch (ArrayIndexOutOfBoundsException e) {
-            throw new RuntimeException("Summer buffer overflow b.len=" + 
-                                       b.length + ", off=" + off + 
-                                       ", summed=" + summed + ", read=" + 
-                                       read + ", bytesPerSum=" + bytesPerSum +
-                                       ", inSum=" + inSum, e);
-          }
-          summed += toSum;
+            int summed = 0;
+            while (summed < read) {
+              int goal = bytesPerSum - inSum;
+              int inBuf = read - summed;
+              int toSum = inBuf <= goal ? inBuf : goal;
           
-          inSum += toSum;
-          if (inSum == bytesPerSum) {
-            verifySum(read-(summed-bytesPerSum));
+              try {
+                sum.update(b, off+summed, toSum);
+              } catch (ArrayIndexOutOfBoundsException e) {
+                throw new RuntimeException("Summer buffer overflow b.len=" + 
+                                           b.length + ", off=" + off + 
+                                           ", summed=" + summed + ", read=" + 
+                                           read + ", bytesPerSum=" + bytesPerSum +
+                                           ", inSum=" + inSum, e);
+              }
+              summed += toSum;
+          
+              inSum += toSum;
+              if (inSum == bytesPerSum) {
+                verifySum(read-(summed-bytesPerSum));
+              }
+            }
+          } catch (ChecksumException ce) {
+            LOG.info("Found checksum error: " + StringUtils.stringifyException(ce));
+            if (retriesLeft == 0) {
+              throw ce;
+            }
+            sums.seek(oldSumsPos);
+            if (!((FSInputStream)in).seekToNewSource(oldPos) ||
+                !((FSInputStream)sumsIn).seekToNewSource(oldSumsPos)) {
+              // Neither the data stream nor the checksum stream are being read from
+              // different sources, meaning we'll still get a checksum error if we 
+              // try to do the read again.  We throw an exception instead.
+              throw ce;
+            } else {
+              // Since at least one of the sources is different, the read might succeed,
+              // so we'll retry.
+              retry = true;
+            }
           }
         }
-      }
-        
+      } while (retry);
       return read;
     }
 
@@ -270,7 +297,11 @@
   public FSDataInputStream(FileSystem fs, Path file, int bufferSize, Configuration conf)
       throws IOException {
     super(null);
-    this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize);
+    Checker chkr = new Checker(fs, file, conf);  // sets bytesPerSum
+    if (bufferSize % bytesPerSum != 0) {
+      throw new IOException("Buffer size must be multiple of " + bytesPerSum);
+    }
+    this.in = new Buffer(new PositionCache(chkr), bufferSize);
   }
   
   
@@ -278,7 +309,11 @@
     throws IOException {
     super(null);
     int bufferSize = conf.getInt("io.file.buffer.size", 4096);
-    this.in = new Buffer(new PositionCache(new Checker(fs, file, conf)), bufferSize);
+    Checker chkr = new Checker(fs, file, conf);
+    if (bufferSize % bytesPerSum != 0) {
+      throw new IOException("Buffer size must be multiple of " + bytesPerSum);
+    }
+    this.in = new Buffer(new PositionCache(chkr), bufferSize);
   }
     
   /** Construct without checksums. */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java Fri Jan 26 13:49:38 2007
@@ -38,7 +38,13 @@
      * Return the current offset from the start of the file
      */
     public abstract long getPos() throws IOException;
-    
+
+    /**
+     * Seeks a different copy of the data.  Returns true if 
+     * found a new source, false otherwise.
+     */
+    public abstract boolean seekToNewSource(long targetPos) throws IOException;
+
     public int read(long position, byte[] buffer, int offset, int length)
     throws IOException {
       synchronized (this) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Fri Jan 26 13:49:38 2007
@@ -114,6 +114,10 @@
       din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
     }
     
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+
     public int available() throws IOException {
       return din.available(); 
     }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Fri Jan 26 13:49:38 2007
@@ -95,6 +95,10 @@
           return fis.getChannel().position();
         }
 
+        public boolean seekToNewSource(long targetPos) throws IOException {
+          return false;
+        }
+
         /*
          * Just forward to the fis
          */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java?view=diff&rev=500370&r1=500369&r2=500370
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java Fri Jan 26 13:49:38 2007
@@ -61,6 +61,11 @@
   }
 
   @Override
+  public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
   public synchronized int read() throws IOException {
     if (closed) {
       throw new IOException("Stream closed");