You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2010/07/01 10:37:30 UTC

svn commit: r959555 - in /hadoop/common/branches/branch-0.20-append: ./ src/core/org/apache/hadoop/io/ src/core/org/apache/hadoop/util/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apach...

Author: dhruba
Date: Thu Jul  1 08:37:30 2010
New Revision: 959555

URL: http://svn.apache.org/viewvc?rev=959555&view=rev
Log:
HDFS-1057.  Concurrent readers hit ChecksumExceptions if following 
a writer to very end of file (Sam Rash via dhruba)


Added:
    hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java
Modified:
    hadoop/common/branches/branch-0.20-append/CHANGES.txt
    hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Thu Jul  1 08:37:30 2010
@@ -19,6 +19,9 @@ Release 0.20-append - Unreleased
     HDFS-457. Better handling of volume failure in DataNode Storage.
     (Nicolas Spiegelberg via dhruba)
 
+    HDFS-1057.  Concurrent readers hit ChecksumExceptions if following 
+    a writer to very end of file (Sam Rash via dhruba)
+
   IMPROVEMENTS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java Thu Jul  1 08:37:30 2010
@@ -20,6 +20,8 @@ package org.apache.hadoop.io;
 
 import java.io.*;
 import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 
 import org.apache.commons.logging.Log;
 
@@ -105,6 +107,28 @@ public class IOUtils {
       off += ret;
     }
   }
+
+  /** Reads len bytes in a loop using the channel of the stream
+   * @param fileChannel a FileChannel to read len bytes into buf
+   * @param buf The buffer to fill
+   * @param off offset from the buffer
+   * @param len the length of bytes to read
+   * @throws IOException if it could not read requested number of bytes 
+   * for any reason (including EOF)
+   */
+  public static void readFileChannelFully( FileChannel fileChannel, byte buf[],
+      int off, int len ) throws IOException {
+    int toRead = len;
+    ByteBuffer byteBuffer = ByteBuffer.wrap(buf, off, len);
+    while ( toRead > 0 ) {
+      int ret = fileChannel.read(byteBuffer);
+      if ( ret < 0 ) {
+        throw new IOException( "Premeture EOF from inputStream");
+      }
+      toRead -= ret;
+      off += ret;
+    }
+  }
   
   /** Similar to readFully(). Skips bytes in a loop.
    * @param in The InputStream to skip bytes from

Added: hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java?rev=959555&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java (added)
+++ hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java Thu Jul  1 08:37:30 2010
@@ -0,0 +1,40 @@
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+
+public class ChecksumUtil {
+  /**
+   * updates the checksum for a buffer
+   * 
+   * @param buf - buffer to update the checksum in
+   * @param chunkOff - offset in the buffer where the checksum is to update
+   * @param dataOff - offset in the buffer of the data
+   * @param dataLen - length of data to compute checksum on
+   */
+  public static void updateChunkChecksum(
+    byte[] buf,
+    int checksumOff,
+    int dataOff, 
+    int dataLen,
+    DataChecksum checksum
+  ) throws IOException {
+    int bytesPerChecksum = checksum.getBytesPerChecksum();
+    int checksumSize = checksum.getChecksumSize();
+    int curChecksumOff = checksumOff;
+    int curDataOff = dataOff;
+    int numChunks = (dataLen + bytesPerChecksum - 1) / bytesPerChecksum;
+    int dataLeft = dataLen;
+    
+    for (int i = 0; i < numChunks; i++) {
+      int len = Math.min(dataLeft, bytesPerChecksum);
+      
+      checksum.reset();
+      checksum.update(buf, curDataOff, len);
+      checksum.writeValue(buf, curChecksumOff, false);
+      
+      curDataOff += len;
+      curChecksumOff += checksumSize;
+      dataLeft -= len;
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Jul  1 08:37:30 2010
@@ -480,7 +480,10 @@ class BlockReceiver implements java.io.C
 
     /// flush entire packet before sending ack
     flush();
-
+    
+    // update length only after flush to disk
+    datanode.data.setVisibleLength(block, offsetInBlock);
+    
     // put in queue for pending acks
     if (responder != null) {
       ((PacketResponder)responder.getRunnable()).enqueue(seqno,

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Jul  1 08:37:30 2010
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.ChecksumUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 
@@ -64,6 +65,7 @@ class BlockSender implements java.io.Clo
   private boolean verifyChecksum; //if true, check is verified while reading
   private BlockTransferThrottler throttler;
   private final String clientTraceFmt; // format of client trace log message
+  private final MemoizedBlock memoizedBlock;
 
   /**
    * Minimum buffer used while sending data to clients. Used only if
@@ -89,7 +91,7 @@ class BlockSender implements java.io.Clo
       this.chunkOffsetOK = chunkOffsetOK;
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
-      this.blockLength = datanode.data.getLength(block);
+      this.blockLength = datanode.data.getVisibleLength(block);
       this.transferToAllowed = datanode.transferToAllowed;
       this.clientTraceFmt = clientTraceFmt;
 
@@ -164,6 +166,7 @@ class BlockSender implements java.io.Clo
       seqno = 0;
 
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+      memoizedBlock = new MemoizedBlock(blockIn, blockLength, datanode.data, block);
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
       IOUtils.closeStream(blockIn);
@@ -234,6 +237,15 @@ class BlockSender implements java.io.Clo
 
     int len = Math.min((int) (endOffset - offset),
                        bytesPerChecksum*maxChunks);
+    
+    // truncate len so that any partial chunks will be sent as a final packet.
+    // this is not necessary for correctness, but partial chunks are 
+    // ones that may be recomputed and sent via buffer copy, so try to minimize
+    // those bytes
+    if (len > bytesPerChecksum && len % bytesPerChecksum != 0) {
+      len -= len % bytesPerChecksum;
+    }
+    
     if (len == 0) {
       return 0;
     }
@@ -298,32 +310,54 @@ class BlockSender implements java.io.Clo
           cOff += checksumSize;
         }
       }
-      //writing is done below (mainly to handle IOException)
-    }
-    
-    try {
-      if (blockInPosition >= 0) {
+      
+      // only recompute checksum if we can't trust the meta data due to 
+      // concurrent writes
+      if (memoizedBlock.hasBlockChanged(len)) {
+        ChecksumUtil.updateChunkChecksum(
+          buf, checksumOff, dataOff, len, checksum
+        );
+      }
+      
+      try {
+        out.write(buf, 0, dataOff + len);
+      } catch (IOException e) {
+        throw ioeToSocketException(e);
+      }
+    } else {
+      try {
         //use transferTo(). Checks on out and blockIn are already done. 
+        SocketOutputStream sockOut = (SocketOutputStream) out;
+        FileChannel fileChannel = ((FileInputStream) blockIn).getChannel();
 
-        SocketOutputStream sockOut = (SocketOutputStream)out;
-        //first write the packet
-        sockOut.write(buf, 0, dataOff);
-        // no need to flush. since we know out is not a buffered stream. 
-
-        sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
-                                blockInPosition, len);
+        if (memoizedBlock.hasBlockChanged(len)) {
+          fileChannel.position(blockInPosition);
+          IOUtils.readFileChannelFully(
+            fileChannel,
+            buf,
+            dataOff,
+            len
+          );
+          
+          ChecksumUtil.updateChunkChecksum(
+            buf, checksumOff, dataOff, len, checksum
+          );          
+          sockOut.write(buf, 0, dataOff + len);
+        } else {
+          //first write the packet
+          sockOut.write(buf, 0, dataOff);
+          // no need to flush. since we know out is not a buffered stream.
+          sockOut.transferToFully(fileChannel, blockInPosition, len);
+        }
 
         blockInPosition += len;
-      } else {
-        // normal transfer
-        out.write(buf, 0, dataOff + len);
-      }
-      
-    } catch (IOException e) {
+
+      } catch (IOException e) {
       /* exception while writing to the client (well, with transferTo(),
        * it could also be while reading from the local file).
        */
-      throw ioeToSocketException(e);
+        throw ioeToSocketException(e);
+      }
     }
 
     if (throttler != null) { // rebalancing so throttle
@@ -382,12 +416,13 @@ class BlockSender implements java.io.Clo
         streamForSendChunks = baseStream;
         
         // assure a mininum buffer size.
-        maxChunksPerPacket = (Math.max(BUFFER_SIZE, 
+        maxChunksPerPacket = (Math.max(BUFFER_SIZE,
                                        MIN_BUFFER_WITH_TRANSFERTO)
                               + bytesPerChecksum - 1)/bytesPerChecksum;
         
-        // allocate smaller buffer while using transferTo(). 
-        pktSize += checksumSize * maxChunksPerPacket;
+        // packet buffer has to be able to do a normal transfer in the case
+        // of recomputing checksum
+        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
       } else {
         maxChunksPerPacket = Math.max(1,
                  (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
@@ -410,7 +445,13 @@ class BlockSender implements java.io.Clo
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);
       }
-    } finally {
+    }
+    catch (RuntimeException e) {
+      LOG.error("unexpected exception sending block", e);
+      
+      throw new IOException("unexpected runtime exception", e);
+    } 
+    finally {
       if (clientTraceFmt != null) {
         ClientTraceLog.info(String.format(clientTraceFmt, totalRead));
       }
@@ -425,4 +466,49 @@ class BlockSender implements java.io.Clo
   boolean isBlockReadFully() {
     return blockReadFully;
   }
+
+  /**
+   * helper class used to track if a block's meta data is verifiable or not
+   */
+  private class MemoizedBlock {
+    // block data stream 
+    private InputStream inputStream;
+    //  visible block length
+    private long blockLength;
+    private final FSDatasetInterface fsDataset;
+    private final Block block;
+
+    private MemoizedBlock(
+      InputStream inputStream,
+      long blockLength,
+      FSDatasetInterface fsDataset, Block block) {
+      this.inputStream = inputStream;
+      this.blockLength = blockLength;
+      this.fsDataset = fsDataset;
+      this.block = block;
+    }
+
+    // logic: if we are starting or ending on a partial chunk and the block
+    // has more data than we were told at construction, the block has 'changed'
+    // in a way that we care about (ie, we can't trust crc data) 
+    boolean hasBlockChanged(long dataLen) throws IOException {
+      // check if we are using transferTo since we tell if the file has changed
+      // (blockInPosition >= 0 => we are using transferTo and File Channels
+      if (BlockSender.this.blockInPosition >= 0) {
+        long currentLength = ((FileInputStream) inputStream).getChannel().size();
+        
+        return (blockInPosition % bytesPerChecksum != 0 || 
+            dataLen % bytesPerChecksum != 0) &&
+          currentLength > blockLength;
+
+      } else {
+        long currentLength = fsDataset.getLength(block);
+        
+        // offset is the offset into the block
+        return (BlockSender.this.offset % bytesPerChecksum != 0 || 
+            dataLen % bytesPerChecksum != 0) &&
+            currentLength > blockLength;
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Jul  1 08:37:30 2010
@@ -709,14 +709,15 @@ public class FSDataset implements FSCons
   //Find better place?
   public static final String METADATA_EXTENSION = ".meta";
   public static final short METADATA_VERSION = 1;
-    
+  
 
   static class ActiveFile {
     final File file;
     final List<Thread> threads = new ArrayList<Thread>(2);
+    private volatile long visibleLength;
 
     ActiveFile(File f, List<Thread> list) {
-      file = f;
+      this(f);
       if (list != null) {
         threads.addAll(list);
       }
@@ -726,13 +727,22 @@ public class FSDataset implements FSCons
     // no active threads associated with this ActiveFile
     ActiveFile(File f) {
       file = f;
+      visibleLength = f.length();
     }
-    
+
+    public long getVisibleLength() {
+      return visibleLength;
+    }
+
+    public void setVisibleLength(long value) {
+      visibleLength = value;
+    }
+
     public String toString() {
       return getClass().getSimpleName() + "(file=" + file
           + ", threads=" + threads + ")";
     }
-  } 
+  }
   
   static String getMetaFileName(String blockFileName, long genStamp) {
     return blockFileName + "_" + genStamp + METADATA_EXTENSION;
@@ -782,7 +792,7 @@ public class FSDataset implements FSCons
   }
 
   /** Return the block file for the given ID */ 
-  public File findBlockFile(long blockId) {
+  public synchronized File findBlockFile(long blockId) {
     final Block b = new Block(blockId);
     File blockfile = null;
     ActiveFile activefile = ongoingCreates.get(b);
@@ -808,7 +818,8 @@ public class FSDataset implements FSCons
       return null;
     }
     File metafile = findMetaFile(blockfile);
-    return new Block(blkid, blockfile.length(),
+    Block block = new Block(blkid);
+    return new Block(blkid, getVisibleLength(block),
         parseGenerationStamp(blockfile, metafile));
   }
 
@@ -883,6 +894,31 @@ public class FSDataset implements FSCons
     return getBlockFile(b).length();
   }
 
+  @Override
+  public synchronized long getVisibleLength(Block b) throws IOException {
+    ActiveFile activeFile = ongoingCreates.get(b);
+
+    if (activeFile != null) {
+      return activeFile.getVisibleLength();
+    } else {
+      return getLength(b);
+    }
+  }
+
+  @Override
+  public synchronized void setVisibleLength(Block b, long length) 
+    throws IOException {
+    ActiveFile activeFile = ongoingCreates.get(b);
+
+    if (activeFile != null) {
+      activeFile.setVisibleLength(length);
+    } else {
+      throw new IOException(
+        String.format("block %s is not being written to", b)
+      );
+    }
+  }
+
   /**
    * Get File name for a given block.
    */

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Jul  1 08:37:30 2010
@@ -40,7 +40,7 @@ import org.apache.hadoop.util.DiskChecke
  *
  */
 public interface FSDatasetInterface extends FSDatasetMBean {
-  
+
   
   /**
    * Returns the length of the metadata file of the specified block
@@ -49,7 +49,7 @@ public interface FSDatasetInterface exte
    * @throws IOException
    */
   public long getMetaDataLength(Block b) throws IOException;
-  
+
   /**
    * This class provides the input stream and length of the metadata
    * of a block
@@ -94,6 +94,25 @@ public interface FSDatasetInterface exte
   public long getLength(Block b) throws IOException;
 
   /**
+   * Returns the specified block's visible length (has metadata for this)
+   * @param b
+   * @return   the specified block's visible length
+   * @throws IOException
+   */
+  public long getVisibleLength(Block b) throws IOException;
+
+  /**
+   * update the specified blocks visible meta data.  NOTE: only applies
+   * to blocks that are being written to.  If called on closed blocks,
+   * throws IOException
+   * 
+   * @param b block to update the length for
+   * @param length value to set visible length to
+   * @throws IOException if the block is not in ongoingCreates
+   */
+  public void setVisibleLength(Block b, long length) throws IOException;
+
+  /**
    * @return the generation stamp stored with the block.
    */
   public Block getStoredBlock(long blkid) throws IOException;

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Jul  1 08:37:30 2010
@@ -831,18 +831,28 @@ public class FSNamesystem implements FSC
             blocks[curBlk] + "blockMap has " + numCorruptNodes + 
             " but corrupt replicas map has " + numCorruptReplicas);
       }
-      boolean blockCorrupt = (numCorruptNodes == numNodes);
-      int numMachineSet = blockCorrupt ? numNodes : 
+      DatanodeDescriptor[] machineSet = null;
+      boolean blockCorrupt = false;
+      if (inode.isUnderConstruction() && curBlk == blocks.length - 1
+          && blocksMap.numNodes(blocks[curBlk]) == 0) {
+        // get unfinished block locations
+        INodeFileUnderConstruction cons = (INodeFileUnderConstruction)inode;
+        machineSet = cons.getTargets();
+        blockCorrupt = false;
+      } else {
+        blockCorrupt = (numCorruptNodes == numNodes);
+        int numMachineSet = blockCorrupt ? numNodes : 
                             (numNodes - numCorruptNodes);
-      DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
-      if (numMachineSet > 0) {
-        numNodes = 0;
-        for(Iterator<DatanodeDescriptor> it = 
-            blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
-          DatanodeDescriptor dn = it.next();
-          boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
-          if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
-            machineSet[numNodes++] = dn;
+        machineSet = new DatanodeDescriptor[numMachineSet];
+        if (numMachineSet > 0) {
+          numNodes = 0;
+          for(Iterator<DatanodeDescriptor> it = 
+              blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
+            DatanodeDescriptor dn = it.next();
+            boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
+            if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
+              machineSet[numNodes++] = dn;
+          }
         }
       }
       results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,

Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java?rev=959555&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java (added)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java Thu Jul  1 08:37:30 2010
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+
+/**
+ * This class tests the cases of a concurrent reads/writes to a file;
+ * ie, one writer and one or more readers can see unfinsihed blocks
+ */
+public class TestFileConcurrentReader extends junit.framework.TestCase {
+
+  private enum SyncType
+  {
+    SYNC,
+    APPEND,
+  }
+  
+  
+  private static final Logger LOG = 
+    Logger.getLogger(TestFileConcurrentReader.class);
+  
+  {
+    ((Log4JLogger) LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 8192;
+  private static final int DEFAULT_WRITE_SIZE = 1024 + 1;
+  private static final int SMALL_WRITE_SIZE = 61;
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private FileSystem fileSystem;
+
+  // creates a file but does not close it
+  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+    throws IOException {
+    System.out.println("createFile: Created " + name + " with " + repl + " replica.");
+    FSDataOutputStream stm = fileSys.create(name, true,
+      fileSys.getConf().getInt("io.file.buffer.size", 4096),
+      (short) repl, (long) blockSize);
+    return stm;
+  }
+  
+  @Before
+  protected void setUp() throws IOException {
+    conf = new Configuration();
+    init(conf);    
+  }
+  
+  private void init(Configuration conf) throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitClusterUp();
+    fileSystem = cluster.getFileSystem();
+  }
+  
+  private void writeFileAndSync(FSDataOutputStream stm, int size)
+    throws IOException {
+//    byte[] buffer = AppendTestUtil.randomBytes(seed, size);
+    byte[] buffer = generateSequentialBytes(0, size);
+    stm.write(buffer, 0, size);
+    stm.sync();
+  }
+
+  private void checkCanRead(FileSystem fileSys, Path path, int numBytes)
+    throws IOException {
+    waitForBlocks(fileSys, path);
+    assertBytesAvailable(fileSys, path, numBytes);
+  }
+
+  // make sure bytes are available and match expected
+  private void assertBytesAvailable(
+    FileSystem fileSystem, 
+    Path path, 
+    int numBytes
+  ) throws IOException {
+    byte[] buffer = new byte[numBytes];
+    FSDataInputStream inputStream = fileSystem.open(path);
+    IOUtils.readFully(inputStream, buffer, 0, numBytes);
+
+    assertTrue(
+      "unable to validate bytes",
+      validateSequentialBytes(buffer, 0, numBytes)
+    );
+  }
+
+  private void waitForBlocks(FileSystem fileSys, Path name)
+    throws IOException {
+    // wait until we have at least one block in the file to read.
+    boolean done = false;
+
+    while (!done) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+      done = true;
+      BlockLocation[] locations = fileSys.getFileBlockLocations(
+        fileSys.getFileStatus(name), 0, blockSize);
+      if (locations.length < 1) {
+        done = false;
+        continue;
+      }
+    }
+  }
+
+  /**
+   * Test that that writes to an incomplete block are available to a reader
+   */
+  public void testUnfinishedBlockRead()
+    throws IOException {
+    try {
+      // check that / exists
+      Path path = new Path("/");
+      System.out.println("Path : \"" + path.toString() + "\"");
+      System.out.println(fileSystem.getFileStatus(path).isDir());
+      assertTrue("/ should be a directory",
+        fileSystem.getFileStatus(path).isDir());
+
+      // create a new file in the root, write data, do no close
+      Path file1 = new Path("/unfinished-block");
+      FSDataOutputStream stm = TestFileCreation.createFile(fileSystem, file1, 1);
+
+      // write partial block and sync
+      int partialBlockSize = blockSize / 2;
+      writeFileAndSync(stm, partialBlockSize);
+
+      // Make sure a client can read it before it is closed
+      checkCanRead(fileSystem, file1, partialBlockSize);
+
+      stm.close();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * test case: if the BlockSender decides there is only one packet to send,
+   * the previous computation of the pktSize based on transferToAllowed
+   * would result in too small a buffer to do the buffer-copy needed
+   * for partial chunks.  
+   */
+  public void testUnfinishedBlockPacketBufferOverrun() throws IOException {
+    try {
+      // check that / exists
+      Path path = new Path("/");
+      System.out.println("Path : \"" + path.toString() + "\"");
+      System.out.println(fileSystem.getFileStatus(path).isDir());
+      assertTrue("/ should be a directory",
+        fileSystem.getFileStatus(path).isDir());
+
+      // create a new file in the root, write data, do no close
+      Path file1 = new Path("/unfinished-block");
+      final FSDataOutputStream stm = 
+        TestFileCreation.createFile(fileSystem, file1, 1);
+
+      // write partial block and sync
+      final int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
+      final int partialBlockSize = bytesPerChecksum - 1;
+      
+      writeFileAndSync(stm, partialBlockSize);
+
+      // Make sure a client can read it before it is closed
+      checkCanRead(fileSystem, file1, partialBlockSize);
+
+      stm.close();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  // for some reason, using tranferTo evokes the race condition more often
+  // so test separately
+  public void testUnfinishedBlockCRCErrorTransferTo() throws IOException {
+    runTestUnfinishedBlockCRCError(true, SyncType.SYNC, DEFAULT_WRITE_SIZE);
+  }
+
+  public void testUnfinishedBlockCRCErrorTransferToVerySmallWrite() 
+    throws IOException {
+    runTestUnfinishedBlockCRCError(true, SyncType.SYNC, SMALL_WRITE_SIZE);
+  }
+
+  // fails due to issue w/append, disable 
+  public void _testUnfinishedBlockCRCErrorTransferToAppend() throws IOException {
+    runTestUnfinishedBlockCRCError(true, SyncType.APPEND, DEFAULT_WRITE_SIZE);
+  }
+  
+  public void testUnfinishedBlockCRCErrorNormalTransfer() throws IOException {
+    runTestUnfinishedBlockCRCError(false, SyncType.SYNC, DEFAULT_WRITE_SIZE);
+  }
+
+  public void testUnfinishedBlockCRCErrorNormalTransferVerySmallWrite() 
+    throws IOException {
+    runTestUnfinishedBlockCRCError(false, SyncType.SYNC, SMALL_WRITE_SIZE);
+  }
+  
+  // fails due to issue w/append, disable 
+  public void _testUnfinishedBlockCRCErrorNormalTransferAppend() 
+    throws IOException {
+    runTestUnfinishedBlockCRCError(false, SyncType.APPEND, DEFAULT_WRITE_SIZE);
+  }
+  
+  private void runTestUnfinishedBlockCRCError(
+    final boolean transferToAllowed, SyncType syncType, int writeSize
+  ) throws IOException {
+      runTestUnfinishedBlockCRCError(
+        transferToAllowed, syncType, writeSize, new Configuration()
+      );
+  }
+  
+  private void runTestUnfinishedBlockCRCError(
+    final boolean transferToAllowed, 
+    final SyncType syncType,
+    final int writeSize,
+    Configuration conf
+  ) throws IOException {
+    try {
+      conf.setBoolean("dfs.support.append", syncType == SyncType.APPEND);
+      conf.setBoolean("dfs.datanode.transferTo.allowed", transferToAllowed);
+      init(conf);
+
+      final Path file = new Path("/block-being-written-to");
+      final int numWrites = 2000;
+      final AtomicBoolean writerDone = new AtomicBoolean(false);
+      final AtomicBoolean writerStarted = new AtomicBoolean(false);
+      final AtomicBoolean error = new AtomicBoolean(false);
+      final FSDataOutputStream initialOutputStream = fileSystem.create(file);
+      Thread writer = new Thread(new Runnable() {
+        private FSDataOutputStream outputStream = initialOutputStream;
+
+        @Override
+        public void run() {
+          try {
+            for (int i = 0; !error.get() && i < numWrites; i++) {
+              try {
+                final byte[] writeBuf = 
+                  generateSequentialBytes(i * writeSize, writeSize);              
+                outputStream.write(writeBuf);
+                if (syncType == SyncType.SYNC) {
+                  outputStream.sync();
+                } else { // append
+                  outputStream.close();
+                  outputStream = fileSystem.append(file);
+                }
+                writerStarted.set(true);
+              } catch (IOException e) {
+                error.set(true);
+                LOG.error(String.format("error writing to file"));
+              } 
+            }
+            
+            outputStream.close();
+            writerDone.set(true);
+          } catch (Exception e) {
+            LOG.error("error in writer", e);
+            
+            throw new RuntimeException(e);
+          }
+        }
+      });
+      Thread tailer = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            long startPos = 0;
+            while (!writerDone.get() && !error.get()) {
+              if (writerStarted.get()) {
+                try {
+                  startPos = tailFile(file, startPos);
+                } catch (IOException e) {
+                  LOG.error(String.format("error tailing file %s", file), e);
+  
+                  throw new RuntimeException(e);
+                }
+              }
+            }
+          } catch (RuntimeException e) {
+            if (e.getCause() instanceof ChecksumException) {
+              error.set(true);
+            }
+            
+            LOG.error("error in tailer", e);
+            throw e;
+          }
+        }
+      });
+
+      writer.start();
+      tailer.start();
+
+      try {
+        writer.join();
+        tailer.join();
+        
+        assertFalse(
+          "error occurred, see log above", error.get()
+        );
+      } catch (InterruptedException e) {
+        LOG.info("interrupted waiting for writer or tailer to complete");
+        
+        Thread.currentThread().interrupt();
+      }
+      initialOutputStream.close();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  private byte[] generateSequentialBytes(int start, int length) {
+    byte[] result = new byte[length];
+
+    for (int i = 0; i < length; i++) {
+      result[i] = (byte)((start + i) % 127);
+    }
+    
+    return result;
+  }
+  
+  private boolean validateSequentialBytes(byte[] buf, int startPos, int len) {
+    for (int i = 0; i < len; i++) {
+      int expected = (i + startPos) % 127;
+      
+      if (buf[i] % 127 != expected) {
+        LOG.error(String.format("at position [%d], got [%d] and expected [%d]", 
+          startPos, buf[i], expected));
+        
+        return false;
+      }
+    }
+    
+    return true;
+  }
+  
+  private long tailFile(Path file, long startPos) throws IOException {
+    long numRead = 0;
+    FSDataInputStream inputStream = fileSystem.open(file);
+    inputStream.seek(startPos);
+
+    int len = 4 * 1024;
+    byte[] buf = new byte[len];
+    int read;
+    while ((read = inputStream.read(buf)) > -1) {
+      LOG.info(String.format("read %d bytes", read));
+      
+      if (!validateSequentialBytes(buf, (int) (startPos + numRead), read)) {
+        LOG.error(String.format("invalid bytes: [%s]\n", Arrays.toString(buf)));
+        throw new ChecksumException(
+          String.format("unable to validate bytes"), 
+          startPos
+        );
+      }
+      
+      numRead += read;
+    }
+    
+    inputStream.close();
+    return numRead + startPos - 1;
+  }
+  
+  public void _testClientReadsPastBlockEnd() throws IOException {
+    // todo : client has length greater than server--should handle with 
+    // exception, not with corrupt data!!!
+  }
+}  
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Jul  1 08:37:30 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -66,6 +67,7 @@ public class SimulatedFSDataset  impleme
   Configuration conf = null;
   
   static byte[] nullCrcFileData;
+
   {
     DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.
                               CHECKSUM_NULL, 16*1024 );
@@ -325,6 +327,16 @@ public class SimulatedFSDataset  impleme
     return binfo.getlength();
   }
 
+  @Override
+  public long getVisibleLength(Block b) throws IOException {
+    return getLength(b);
+  }
+
+  @Override
+  public void setVisibleLength(Block b, long length) throws IOException {
+    //no-op
+  }
+
   /** {@inheritDoc} */
   public Block getStoredBlock(long blkid) throws IOException {
     Block b = new Block(blkid);