You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/09/02 06:16:51 UTC

svn commit: r810353 - in /hadoop/hdfs/branches/HDFS-265: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/

Author: hairong
Date: Wed Sep  2 04:16:51 2009
New Revision: 810353

URL: http://svn.apache.org/viewvc?rev=810353&view=rev
Log:
HDFS-537. DataNode exposes a replica's meta info to BlockReived. Contributed by Hairong Kuang.

Added:
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
Modified:
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
    hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Wed Sep  2 04:16:51 2009
@@ -16,6 +16,10 @@
     HDFS-565. Introduce block committing logic during new block allocation
     and file close. (shv)
 
+    HDFS-537. DataNode exposes a replica's meta info to BlockReceiver for the
+    support of dfs writes/hflush. It also updates a replica's bytes received,
+    bytes on disk, and bytes acked after receiving a packet. (hairong)
+
   IMPROVEMENTS
 
     HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Sep  2 04:16:51 2009
@@ -65,7 +65,6 @@
   private ByteBuffer buf; // contains one full packet.
   private int bufRead; //amount of valid data in the buf
   private int maxPacketReadLen;
-  protected long offsetInBlock;
   protected final String inAddr;
   protected final String myAddr;
   private String mirrorAddr;
@@ -78,6 +77,7 @@
   DatanodeInfo srcDataNode = null;
   private Checksum partialCrc = null;
   private final DataNode datanode;
+  final private ReplicaInPipelineInterface replicaInfo;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
                 String myAddr, boolean isRecovery, String clientName, 
@@ -89,7 +89,6 @@
       this.myAddr = myAddr;
       this.isRecovery = isRecovery;
       this.clientName = clientName;
-      this.offsetInBlock = 0;
       this.srcDataNode = srcDataNode;
       this.datanode = datanode;
       this.checksum = DataChecksum.newDataChecksum(in);
@@ -100,13 +99,14 @@
       // Open local disk out
       //
       if (clientName.length() == 0) { //replication or move
-        streams = datanode.data.writeToTemporary(block);
+        replicaInfo = datanode.data.writeToTemporary(block);
       } else if (finalized && isRecovery) { // client append
-        streams = datanode.data.append(block);
+        replicaInfo = datanode.data.append(block);
         this.finalized = false;
       } else { // client write
-        streams = datanode.data.writeToRbw(block, isRecovery);
+        replicaInfo = datanode.data.writeToRbw(block, isRecovery);
       }
+      streams = replicaInfo.createStreams();
       if (streams != null) {
         this.out = streams.dataOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
@@ -397,10 +397,22 @@
     buf.mark();
     //read the header
     buf.getInt(); // packet length
-    offsetInBlock = buf.getLong(); // get offset of packet in block
+    long offsetInBlock = buf.getLong(); // get offset of packet in block
+    
+    if (offsetInBlock > replicaInfo.getNumBytes()) {
+      throw new IOException("Received an out-of-sequence packet for " + block + 
+          "from " + inAddr + " at offset " + offsetInBlock +
+          ". Expecting packet starting at " + replicaInfo.getNumBytes());
+    }
     long seqno = buf.getLong();    // get seqno
     boolean lastPacketInBlock = (buf.get() != 0);
     
+    int len = buf.getInt();
+    if (len < 0) {
+      throw new IOException("Got wrong length during writeBlock(" + block + 
+                            ") from " + inAddr + " at offset " + 
+                            offsetInBlock + ": " + len); 
+    } 
     int endOfHeader = buf.position();
     buf.reset();
     
@@ -412,8 +424,12 @@
                 " lastPacketInBlock " + lastPacketInBlock);
     }
     
-    setBlockPosition(offsetInBlock);
-    
+    // update received bytes
+    offsetInBlock += len;
+    if (replicaInfo.getNumBytes() < offsetInBlock) {
+      replicaInfo.setNumBytes(offsetInBlock);
+    }
+
     //First write the packet to the mirror:
     if (mirrorOut != null) {
       try {
@@ -425,19 +441,10 @@
     }
 
     buf.position(endOfHeader);        
-    int len = buf.getInt();
     
-    if (len < 0) {
-      throw new IOException("Got wrong length during writeBlock(" + block + 
-                            ") from " + inAddr + " at offset " + 
-                            offsetInBlock + ": " + len); 
-    } 
-
     if (len == 0) {
       LOG.debug("Receiving empty packet for block " + block);
     } else {
-      offsetInBlock += len;
-
       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                             checksumSize;
 
@@ -463,8 +470,10 @@
       }
 
       try {
-        if (!finalized) {
+        if (!finalized  && replicaInfo.getBytesOnDisk()<offsetInBlock) {
           //finally write to the disk :
+          setBlockPosition(offsetInBlock-len);
+          
           out.write(pktBuf, dataOff, len);
 
           // If this is a partial chunk, then verify that this is the only
@@ -485,6 +494,7 @@
           } else {
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
+          replicaInfo.setBytesOnDisk(offsetInBlock);
           datanode.myMetrics.bytesWritten.inc(len);
         }
       } catch (IOException iex) {
@@ -499,7 +509,7 @@
     // put in queue for pending acks
     if (responder != null) {
       ((PacketResponder)responder.getRunnable()).enqueue(seqno,
-                                      lastPacketInBlock); 
+                                      lastPacketInBlock, offsetInBlock); 
     }
     
     if (throttler != null) { // throttle I/O
@@ -569,7 +579,7 @@
         close();
 
         // Finalize the block. Does this fsync()?
-        block.setNumBytes(offsetInBlock);
+        block.setNumBytes(replicaInfo.getNumBytes());
         datanode.data.finalizeBlock(block);
         datanode.myMetrics.blocksWritten.inc();
       }
@@ -741,12 +751,13 @@
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
+     * @param lastByteInPacket
      */
-    synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
+    synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
       if (running) {
         LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
                   " to ack queue.");
-        ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
+        ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
         notifyAll();
       }
     }
@@ -820,7 +831,7 @@
               if (!receiver.finalized) {
                 receiver.close();
                 final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-                block.setNumBytes(receiver.offsetInBlock);
+                block.setNumBytes(replicaInfo.getNumBytes());
                 datanode.data.finalizeBlock(block);
                 datanode.myMetrics.blocksWritten.inc();
                 datanode.notifyNamenodeReceivedBlock(block, 
@@ -910,6 +921,9 @@
                   }
                   pkt = ackQueue.removeFirst();
                   expected = pkt.seqno;
+                  if (pkt.lastByteInBlock > replicaInfo.getBytesAcked()) {
+                    replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+                  }
                   notifyAll();
                   LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
                   if (seqno != expected) {
@@ -953,7 +967,7 @@
             if (lastPacketInBlock && !receiver.finalized) {
               receiver.close();
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-              block.setNumBytes(receiver.offsetInBlock);
+              block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
               datanode.myMetrics.blocksWritten.inc();
               datanode.notifyNamenodeReceivedBlock(block, 
@@ -1042,10 +1056,12 @@
   static private class Packet {
     long seqno;
     boolean lastPacketInBlock;
+    long lastByteInBlock;
 
-    Packet(long seqno, boolean lastPacketInBlock) {
+    Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
+      this.lastByteInBlock = lastByteInPacket;
     }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Sep  2 04:16:51 2009
@@ -954,12 +954,6 @@
                                 new FileInputStream(metaInFile.getFD()));
   }
     
-  private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
-      return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
-          new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
-
-  }
-
   /**
    * Make a copy of the block if this block is linked to an existing
    * snapshot. This ensures that modifying this block does not modify
@@ -1100,7 +1094,7 @@
   }
 
   @Override  // FSDatasetInterface
-  public BlockWriteStreams append(Block b)
+  public ReplicaInPipelineInterface append(Block b)
       throws IOException {
     // If the block was successfully finalized because all packets
     // were successfully processed at the Datanode but the ack for
@@ -1129,9 +1123,9 @@
     FSVolume v = volumes.getNextVolume(b.getNumBytes());
     File newBlkFile = v.createRbwFile(b);
     File oldmeta = replicaInfo.getMetaFile();
-    replicaInfo = new ReplicaBeingWritten(replicaInfo,
+    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(replicaInfo,
         v, newBlkFile.getParentFile(), Thread.currentThread());
-    File newmeta = replicaInfo.getMetaFile();
+    File newmeta = newReplicaInfo.getMetaFile();
 
     // rename meta file to rbw directory
     if (DataNode.LOG.isDebugEnabled()) {
@@ -1158,31 +1152,23 @@
     }
     
     // Replace finalized replica by a RBW replica in replicas map
-    volumeMap.add(replicaInfo);
+    volumeMap.add(newReplicaInfo);
     
-    File metafile = getMetaFile(newBlkFile, b);
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("append blockfile is " + newBlkFile 
-                       + " of size " + newBlkFile.length());
-      DataNode.LOG.debug("append metafile is " + metafile 
-                       + " of size " + metafile.length());
-    }    
-    // return the write stream
-    return createBlockWriteStreams(newBlkFile , metafile);
+    return newReplicaInfo;
   }
 
   @Override
-  public BlockWriteStreams writeToRbw(Block b, boolean isRecovery)
+  public ReplicaInPipelineInterface writeToRbw(Block b, boolean isRecovery)
       throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b);
-    File f = null;
+    ReplicaBeingWritten newReplicaInfo;
     if (replicaInfo == null) { // create a new block
       FSVolume v = volumes.getNextVolume(b.getNumBytes());
       // create a rbw file to hold block in the designated volume
-      f = v.createRbwFile(b);
-      replicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
+      File f = v.createRbwFile(b);
+      newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
           b.getGenerationStamp(), v, f.getParentFile());
-      volumeMap.add(replicaInfo);
+      volumeMap.add(newReplicaInfo);
     } else {
       if (!isRecovery) {
         throw new BlockAlreadyExistsException("Block " + b +
@@ -1193,35 +1179,21 @@
         throw new BlockNotFoundException(
             BlockNotFoundException.NON_RBW_REPLICA + b);
       }
-      ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline)replicaInfo;
+      newReplicaInfo = (ReplicaBeingWritten)replicaInfo;
       synchronized (this) {
         //
         // Is it already in the write process?
         //
-        replicaInPipeline.stopWriter();
-        replicaInPipeline.setWriter(Thread.currentThread());
+        newReplicaInfo.stopWriter();
+        newReplicaInfo.setWriter(Thread.currentThread());
       }
-      f = replicaInfo.getBlockFile();
     }
 
-    //
-    // Finally, allow a writer to the block file
-    // REMIND - mjc - make this a filter stream that enforces a max
-    // block size, so clients can't go crazy
-    //
-    File metafile = getMetaFile(f, b);
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("writeToRbw blockfile is " + f +
-                         " of size " + f.length());
-      DataNode.LOG.debug("writeToRbw metafile is " + metafile +
-                         " of size " + metafile.length());
-    }
-    return createBlockWriteStreams( f , metafile);
-
+    return newReplicaInfo;
   }
   
   @Override
-  public BlockWriteStreams writeToTemporary(Block b)
+  public ReplicaInPipelineInterface writeToTemporary(Block b)
       throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b);
     if (replicaInfo != null) {
@@ -1230,23 +1202,14 @@
           " and thus cannot be created.");
     }
     
-    File f = null;
     FSVolume v = volumes.getNextVolume(b.getNumBytes());
     // create a temporary file to hold block in the designated volume
-    f = v.createTmpFile(b);
-    replicaInfo = new ReplicaInPipeline(b.getBlockId(), 
+    File f = v.createTmpFile(b);
+    ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile());
-    volumeMap.add(replicaInfo);
+    volumeMap.add(newReplicaInfo);
     
-    // return the output streams
-    File metafile = getMetaFile(f, b);
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("writeToTemp blockfile is " + f + 
-          " of size " + f.length());
-      DataNode.LOG.debug("writeToTemp metafile is " + metafile + 
-          " of size " + metafile.length());
-    }
-    return createBlockWriteStreams( f , metafile);
+    return newReplicaInfo;
   }
 
   /**
@@ -1293,16 +1256,6 @@
     return vol.createTmpFile(blk);
   }
 
-  synchronized File createRbwFile( FSVolume vol, Block blk ) throws IOException {
-    if ( vol == null ) {
-      vol = getReplicaInfo( blk ).getVolume();
-      if ( vol == null ) {
-        throw new IOException("Could not find volume for block " + blk);
-      }
-    }
-    return vol.createTmpFile(blk);
-  }
-
   //
   // REMIND - mjc - eventually we should have a timeout system
   // in place to clean up block files left by abandoned clients.
@@ -1329,11 +1282,11 @@
       File f = replicaInfo.getBlockFile();
       if (v == null) {
         throw new IOException("No volume for temporary file " + f + 
-            " for block " + b);
+            " for block " + replicaInfo);
       }
 
-      File dest = v.addBlock(b, f);
-      newReplicaInfo = new FinalizedReplica(b, v, dest.getParentFile());
+      File dest = v.addBlock(replicaInfo, f);
+      newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
     }
     volumeMap.add(newReplicaInfo);
   }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Sep  2 04:16:51 2009
@@ -174,11 +174,10 @@
    * Creates a temporary replica and returns output streams to write data and CRC
    * 
    * @param b block
-   * @return a BlockWriteStreams object to allow writing the block data
-   *  and CRC
+   * @return the meata info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public BlockWriteStreams writeToTemporary(Block b) throws IOException;
+  public ReplicaInPipelineInterface writeToTemporary(Block b) throws IOException;
 
   /**
    * Creates/recovers a RBW replica and returns output streams to 
@@ -186,20 +185,19 @@
    * 
    * @param b block
    * @param isRecovery True if this is part of error recovery, otherwise false
-   * @return a BlockWriteStreams object to allow writing the block data
-   *  and CRC
+   * @return the meata info of the replica which is being written to
    * @throws IOException if an error occurs
    */
-  public BlockWriteStreams writeToRbw(Block b, boolean isRecovery) throws IOException;
+  public ReplicaInPipelineInterface writeToRbw(Block b, boolean isRecovery)
+  throws IOException;
 
   /**
    * Append to a finalized replica and returns output streams to write data and CRC
    * @param b block
-   * @return a BlockWriteStreams object to allow writing the block data
-   *  and CRC
+   * @return the meata info of the replica which is being written to
    * @throws IOException
    */
-  public BlockWriteStreams append(Block b) throws IOException;
+  public ReplicaInPipelineInterface append(Block b) throws IOException;
 
   /**
    * Update the block to the new generation stamp and length.  

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Wed Sep  2 04:16:51 2009
@@ -18,11 +18,15 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
+import org.apache.hadoop.io.IOUtils;
 
 /** 
  * This class defines a replica in a pipeline, which
@@ -32,7 +36,8 @@
  * 
  * The base class implements a temporary replica
  */
-class ReplicaInPipeline extends ReplicaInfo {
+class ReplicaInPipeline extends ReplicaInfo
+                        implements ReplicaInPipelineInterface {
   private long bytesAcked;
   private long bytesOnDisk;
   private Thread writer;
@@ -91,35 +96,23 @@
     return ReplicaState.TEMPORARY;
   }
   
-  /**
-   * Get the number of bytes acked
-   * @return the number of bytes acked
-   */
-  long getBytesAcked() {
+  @Override // ReplicaInPipelineInterface
+  public long getBytesAcked() {
     return bytesAcked;
   }
   
-  /**
-   * Set the number bytes that have acked
-   * @param bytesAcked
-   */
-  void setBytesAcked(long bytesAcked) {
+  @Override // ReplicaInPipelineInterface
+  public void setBytesAcked(long bytesAcked) {
     this.bytesAcked = bytesAcked;
   }
   
-  /**
-   * Get the number of bytes that have written to disk
-   * @return the number of bytes that have written to disk
-   */
-  long getBytesOnDisk() {
+  @Override // ReplicaInPipelineInterface
+  public long getBytesOnDisk() {
     return bytesOnDisk;
   }
   
-  /**
-   * Set the number of bytes on disk
-   * @param bytesOnDisk number of bytes on disk
-   */
-  void setBytesOnDisk(long bytesOnDisk) {
+  @Override //ReplicaInPipelineInterface
+  public void setBytesOnDisk(long bytesOnDisk) {
     this.bytesOnDisk = bytesOnDisk;
   }
   
@@ -155,4 +148,29 @@
   public int hashCode() {
     return super.hashCode();
   }
+  
+  @Override // ReplicaInPipelineInterface
+  public BlockWriteStreams createStreams() throws IOException {
+    File blockFile = getBlockFile();
+    File metaFile = getMetaFile();
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("writeTo blockfile is " + blockFile +
+                         " of size " + blockFile.length());
+      DataNode.LOG.debug("writeTo metafile is " + metaFile +
+                         " of size " + metaFile.length());
+    }
+    FileOutputStream blockOut = null;
+    FileOutputStream crcOut = null;
+    try {
+      blockOut = new FileOutputStream(
+          new RandomAccessFile( blockFile, "rw" ).getFD() );
+      crcOut = new FileOutputStream(
+          new RandomAccessFile( metaFile, "rw" ).getFD() );
+      return new BlockWriteStreams(blockOut, crcOut);
+    } catch (IOException e) {
+      IOUtils.closeStream(blockOut);
+      IOUtils.closeStream(crcOut);
+      throw e;
+    }
+  }  
 }

Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=810353&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java Wed Sep  2 04:16:51 2009
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
+
+/** 
+ * This defines the interface of a replica in Pipeline that's being written to
+ */
+interface ReplicaInPipelineInterface {
+  /**
+   * Get the number of bytes received
+   * @return the number of bytes that have been received
+   */
+  long getNumBytes();
+  
+  /**
+   * Set the number of bytes received
+   * @param bytesReceived number of bytes received
+   */
+  void setNumBytes(long bytesReceived);
+  
+  /**
+   * Get the number of bytes acked
+   * @return the number of bytes acked
+   */
+  long getBytesAcked();
+  
+  /**
+   * Set the number bytes that have acked
+   * @param bytesAcked
+   */
+  void setBytesAcked(long bytesAcked);
+  
+  /**
+   * Get the number of bytes that have written to disk
+   * @return the number of bytes that have written to disk
+   */
+  long getBytesOnDisk();
+  
+  /**
+   * Set the number of bytes on disk
+   * @param bytesOnDisk number of bytes on disk
+   */
+  void setBytesOnDisk(long bytesOnDisk);
+  
+  /**
+   * Create output streams for writing to this replica, 
+   * one for block file and one for CRC file
+   * 
+   * @return output streams for writing
+   * @throws IOException if any error occurs
+   */
+  public BlockWriteStreams createStreams() throws IOException;
+}

Modified: hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/findbugsExcludeFile.xml Wed Sep  2 04:16:51 2009
@@ -214,10 +214,7 @@
      -->
      <Match>
        <Class name="org.apache.hadoop.hdfs.server.datanode.FSDataset" />
-       <Or>
-         <Method name="createBlockWriteStreams" />
-         <Method name="getTmpInputStreams" />
-       </Or>
+       <Method name="getTmpInputStreams" />
        <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
      </Match>
 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Sep  2 04:16:51 2009
@@ -77,11 +77,14 @@
       nullCrcFileData[i+2] = nullCrcHeader[i];
     }
   }
-  
-  private class BInfo { // information about a single block
+
+  // information about a single block
+  private class BInfo implements ReplicaInPipelineInterface {
     Block theBlock;
     private boolean finalized = false; // if not finalized => ongoing creation
     SimulatedOutputStream oStream = null;
+    private long bytesAcked;
+    private long bytesRcvd;
     BInfo(Block b, boolean forWriting) throws IOException {
       theBlock = new Block(b);
       if (theBlock.getNumBytes() < 0) {
@@ -108,20 +111,21 @@
 
     synchronized void updateBlock(Block b) {
       theBlock.setGenerationStamp(b.getGenerationStamp());
-      setlength(b.getNumBytes());
+      setNumBytes(b.getNumBytes());
+      setBytesOnDisk(b.getNumBytes());
     }
     
-    synchronized long getlength() {
+    synchronized public long getNumBytes() {
       if (!finalized) {
-         return oStream.getLength();
+         return bytesRcvd;
       } else {
         return theBlock.getNumBytes();
       }
     }
 
-    synchronized void setlength(long length) {
+    synchronized public void setNumBytes(long length) {
       if (!finalized) {
-         oStream.setLength(length);
+         bytesRcvd = length;
       } else {
         theBlock.setNumBytes(length);
       }
@@ -170,7 +174,20 @@
       oStream = null;
       return;
     }
-    
+
+    synchronized void unfinalizeBlock() throws IOException {
+      if (!finalized) {
+        throw new IOException("Unfinalized a block that's not finalized "
+            + theBlock);
+      }
+      finalized = false;
+      oStream = new SimulatedOutputStream();
+      long blockLen = theBlock.getNumBytes();
+      oStream.setLength(blockLen);
+      bytesRcvd = blockLen;
+      bytesAcked = blockLen;
+    }
+
     SimulatedInputStream getMetaIStream() {
       return new SimulatedInputStream(nullCrcFileData);  
     }
@@ -178,6 +195,49 @@
     synchronized boolean isFinalized() {
       return finalized;
     }
+
+    @Override
+    synchronized public BlockWriteStreams createStreams() throws IOException {
+      if (finalized) {
+        throw new IOException("Trying to write to a finalized replica "
+            + theBlock);
+      } else {
+        SimulatedOutputStream crcStream = new SimulatedOutputStream();
+        return new BlockWriteStreams(oStream, crcStream);
+      }
+    }
+
+    @Override
+    synchronized public long getBytesAcked() {
+      if (finalized) {
+        return theBlock.getNumBytes();
+      } else {
+        return bytesAcked;
+      }
+    }
+
+    @Override
+    synchronized public void setBytesAcked(long bytesAcked) {
+      if (!finalized) {
+        this.bytesAcked = bytesAcked;
+      }
+    }
+
+    @Override
+    synchronized public long getBytesOnDisk() {
+      if (finalized) {
+        return theBlock.getNumBytes();
+      } else {
+        return oStream.getLength();
+      }
+    }
+
+    @Override
+    synchronized public void setBytesOnDisk(long bytesOnDisk) {
+      if (!finalized) {
+        oStream.setLength(bytesOnDisk);
+      }
+    }
   }
   
   static private class SimulatedStorage {
@@ -311,7 +371,7 @@
     if (binfo == null) {
       throw new IOException("Finalizing a non existing block " + b);
     }
-    return binfo.getlength();
+    return binfo.getNumBytes();
   }
 
   /** {@inheritDoc} */
@@ -322,7 +382,7 @@
       return null;
     }
     b.setGenerationStamp(binfo.getGenerationStamp());
-    b.setNumBytes(binfo.getlength());
+    b.setNumBytes(binfo.getNumBytes());
     return b;
   }
 
@@ -350,7 +410,7 @@
         DataNode.LOG.warn("Invalidate: Missing block");
         continue;
       }
-      storage.free(binfo.getlength());
+      storage.free(binfo.getNumBytes());
       blockMap.remove(b);
     }
       if (error) {
@@ -381,25 +441,35 @@
   }
 
   @Override
-  public BlockWriteStreams append(Block b) throws IOException {
-    return writeToBlock(b, true);
+  public ReplicaInPipelineInterface append(Block b) throws IOException {
+    BInfo binfo = blockMap.get(b);
+    if (binfo == null || !binfo.isFinalized()) {
+      throw new BlockNotFoundException("Block " + b
+          + " is not valid, and cannot be appended to.");
+    }
+    binfo.unfinalizeBlock();
+    return binfo;
   }
 
   @Override
-  public synchronized BlockWriteStreams writeToRbw(Block b, boolean isRecovery)
-      throws IOException {
-    return writeToBlock(b, isRecovery);
+  public synchronized ReplicaInPipelineInterface writeToRbw(Block b,
+      boolean isRecovery) throws IOException {
+    if (isValidBlock(b)) {
+      throw new BlockAlreadyExistsException("Block " + b
+          + " is valid, and cannot be written to.");
+    }
+    BInfo binfo = blockMap.get(b);
+    if (isRecovery && binfo != null) {
+      return binfo;
+    }
+    binfo = new BInfo(b, true);
+    blockMap.put(b, binfo);
+    return binfo;
   }
 
   @Override
-  public synchronized BlockWriteStreams writeToTemporary(Block b)
+  public synchronized ReplicaInPipelineInterface writeToTemporary(Block b)
       throws IOException {
-    return writeToBlock(b, false);
-  }
-
-  private synchronized BlockWriteStreams writeToBlock(Block b, 
-                                            boolean isRecovery)
-                                            throws IOException {
     if (isValidBlock(b)) {
           throw new BlockAlreadyExistsException("Block " + b + 
               " is valid, and cannot be written to.");
@@ -408,10 +478,9 @@
         throw new BlockAlreadyExistsException("Block " + b + 
             " is being written, and cannot be written to.");
     }
-      BInfo binfo = new BInfo(b, true);
-      blockMap.put(b, binfo);
-      SimulatedOutputStream crcStream = new SimulatedOutputStream();
-      return new BlockWriteStreams(binfo.oStream, crcStream);
+    BInfo binfo = new BInfo(b, true);
+    blockMap.put(b, binfo);
+    return binfo;
   }
 
   public synchronized InputStream getBlockInputStream(Block b)
@@ -500,7 +569,7 @@
     if (binfo == null) {
       throw new IOException("No such Block " + b );
     }
-    return binfo.getlength();
+    return binfo.getNumBytes();
   }
 
   public synchronized void setChannelPosition(Block b, BlockWriteStreams stream, 
@@ -510,7 +579,7 @@
     if (binfo == null) {
       throw new IOException("No such Block " + b );
     }
-    binfo.setlength(dataOffset);
+    binfo.setBytesOnDisk(dataOffset);
   }
 
   /** 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Wed Sep  2 04:16:51 2009
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
 import org.apache.hadoop.util.DataChecksum;
 
 /**
@@ -62,14 +63,19 @@
     int bytesAdded = 0;
     for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) {
       Block b = new Block(i, 0, 0); // we pass expected len as zero, - fsdataset should use the sizeof actual data written
-      OutputStream dataOut  = fsdataset.writeToRbw(b, false).dataOut;
-      assertEquals(0, fsdataset.getLength(b));
-      for (int j=1; j <= blockIdToLen(i); ++j) {
-        dataOut.write(j);
-        assertEquals(j, fsdataset.getLength(b)); // correct length even as we write
-        bytesAdded++;
+      ReplicaInPipelineInterface bInfo = fsdataset.writeToRbw(b, false);
+      BlockWriteStreams out = bInfo.createStreams();
+      try {
+        OutputStream dataOut  = out.dataOut;
+        assertEquals(0, fsdataset.getLength(b));
+        for (int j=1; j <= blockIdToLen(i); ++j) {
+          dataOut.write(j);
+          assertEquals(j, bInfo.getBytesOnDisk()); // correct length even as we write
+          bytesAdded++;
+        }
+      } finally {
+        out.close();
       }
-      dataOut.close();
       b.setNumBytes(blockIdToLen(i));
       fsdataset.finalizeBlock(b);
       assertEquals(blockIdToLen(i), fsdataset.getLength(b));

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java?rev=810353&r1=810352&r2=810353&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java Wed Sep  2 04:16:51 2009
@@ -120,10 +120,10 @@
   }
   
   private void testAppend(FSDataset dataSet) throws IOException {
-    dataSet.append(blocks[FINALIZED]).close();  // successful
+    dataSet.append(blocks[FINALIZED]);  // successful
     
     try {
-      dataSet.append(blocks[TEMPORARY]).close();
+      dataSet.append(blocks[TEMPORARY]);
       Assert.fail("Should not have appended to a temporary replica " 
           + blocks[TEMPORARY]);
     } catch (BlockNotFoundException e) {
@@ -132,7 +132,7 @@
     }
 
     try {
-      dataSet.append(blocks[RBW]).close();
+      dataSet.append(blocks[RBW]);
       Assert.fail("Should not have appended to an RBW replica" + blocks[RBW]);
     } catch (BlockNotFoundException e) {
       Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
@@ -140,7 +140,7 @@
     }
 
     try {
-      dataSet.append(blocks[RWR]).close();
+      dataSet.append(blocks[RWR]);
       Assert.fail("Should not have appended to an RWR replica" + blocks[RWR]);
     } catch (BlockNotFoundException e) {
       Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
@@ -148,7 +148,7 @@
     }
 
     try {
-      dataSet.append(blocks[RUR]).close();
+      dataSet.append(blocks[RUR]);
       Assert.fail("Should not have appended to an RUR replica" + blocks[RUR]);
     } catch (BlockNotFoundException e) {
       Assert.assertEquals(BlockNotFoundException.UNFINALIZED_REPLICA +
@@ -156,7 +156,7 @@
     }
 
     try {
-      dataSet.append(blocks[NON_EXISTENT]).close();
+      dataSet.append(blocks[NON_EXISTENT]);
       Assert.fail("Should not have appended to a non-existent replica " + 
           blocks[NON_EXISTENT]);
     } catch (BlockNotFoundException e) {
@@ -167,7 +167,7 @@
 
   private void testWriteToRbw(FSDataset dataSet) throws IOException {
     try {
-      dataSet.writeToRbw(blocks[FINALIZED], true).close();
+      dataSet.writeToRbw(blocks[FINALIZED], true);
       Assert.fail("Should not have recovered a finalized replica " +
           blocks[FINALIZED]);
     } catch (BlockNotFoundException e) {
@@ -176,14 +176,14 @@
     }
  
     try {
-      dataSet.writeToRbw(blocks[FINALIZED], false).close();
+      dataSet.writeToRbw(blocks[FINALIZED], false);
       Assert.fail("Should not have created a replica that's already " +
       		"finalized " + blocks[FINALIZED]);
     } catch (BlockAlreadyExistsException e) {
     }
  
     try {
-      dataSet.writeToRbw(blocks[TEMPORARY], true).close();
+      dataSet.writeToRbw(blocks[TEMPORARY], true);
       Assert.fail("Should not have recovered a temporary replica " +
           blocks[TEMPORARY]);
     } catch (BlockNotFoundException e) {
@@ -192,23 +192,23 @@
     }
 
     try {
-      dataSet.writeToRbw(blocks[TEMPORARY], false).close();
+      dataSet.writeToRbw(blocks[TEMPORARY], false);
       Assert.fail("Should not have created a replica that had created as " +
       		"temporary " + blocks[TEMPORARY]);
     } catch (BlockAlreadyExistsException e) {
     }
         
-    dataSet.writeToRbw(blocks[RBW], true).close();  // expect to be successful
+    dataSet.writeToRbw(blocks[RBW], true);  // expect to be successful
     
     try {
-      dataSet.writeToRbw(blocks[RBW], false).close();
+      dataSet.writeToRbw(blocks[RBW], false);
       Assert.fail("Should not have created a replica that had created as RBW " +
           blocks[RBW]);
     } catch (BlockAlreadyExistsException e) {
     }
     
     try {
-      dataSet.writeToRbw(blocks[RWR], true).close();
+      dataSet.writeToRbw(blocks[RWR], true);
       Assert.fail("Should not have recovered a RWR replica " + blocks[RWR]);
     } catch (BlockNotFoundException e) {
       Assert.assertEquals(BlockNotFoundException.NON_RBW_REPLICA + 
@@ -216,14 +216,14 @@
     }
 
     try {
-      dataSet.writeToRbw(blocks[RWR], false).close();
+      dataSet.writeToRbw(blocks[RWR], false);
       Assert.fail("Should not have created a replica that was waiting to be " +
       		"recovered " + blocks[RWR]);
     } catch (BlockAlreadyExistsException e) {
     }
     
     try {
-      dataSet.writeToRbw(blocks[RUR], true).close();
+      dataSet.writeToRbw(blocks[RUR], true);
       Assert.fail("Should not have recovered a RUR replica " + blocks[RUR]);
     } catch (BlockNotFoundException e) {
       Assert.assertEquals(BlockNotFoundException.NON_RBW_REPLICA + 
@@ -231,58 +231,58 @@
     }
 
     try {
-      dataSet.writeToRbw(blocks[RUR], false).close();
+      dataSet.writeToRbw(blocks[RUR], false);
       Assert.fail("Should not have created a replica that was under recovery " +
           blocks[RUR]);
     } catch (BlockAlreadyExistsException e) {
     }
     
-    dataSet.writeToRbw(blocks[NON_EXISTENT], true).close();
+    dataSet.writeToRbw(blocks[NON_EXISTENT], true);
     
     // remove this replica
     ReplicaInfo removedReplica = dataSet.volumeMap.remove(blocks[NON_EXISTENT]);
     removedReplica.getBlockFile().delete();
     removedReplica.getMetaFile().delete();
     
-    dataSet.writeToRbw(blocks[NON_EXISTENT], false).close();
+    dataSet.writeToRbw(blocks[NON_EXISTENT], false);
   }
   
   private void testWriteToTemporary(FSDataset dataSet) throws IOException {
     try {
-      dataSet.writeToTemporary(blocks[FINALIZED]).close();
+      dataSet.writeToTemporary(blocks[FINALIZED]);
       Assert.fail("Should not have created a temporary replica that was " +
       		"finalized " + blocks[FINALIZED]);
     } catch (BlockAlreadyExistsException e) {
     }
  
     try {
-      dataSet.writeToTemporary(blocks[TEMPORARY]).close();
+      dataSet.writeToTemporary(blocks[TEMPORARY]);
       Assert.fail("Should not have created a replica that had created as" +
       		"temporary " + blocks[TEMPORARY]);
     } catch (BlockAlreadyExistsException e) {
     }
     
     try {
-      dataSet.writeToTemporary(blocks[RBW]).close();
+      dataSet.writeToTemporary(blocks[RBW]);
       Assert.fail("Should not have created a replica that had created as RBW " +
           blocks[RBW]);
     } catch (BlockAlreadyExistsException e) {
     }
     
     try {
-      dataSet.writeToTemporary(blocks[RWR]).close();
+      dataSet.writeToTemporary(blocks[RWR]);
       Assert.fail("Should not have created a replica that was waiting to be " +
       		"recovered " + blocks[RWR]);
     } catch (BlockAlreadyExistsException e) {
     }
     
     try {
-      dataSet.writeToTemporary(blocks[RUR]).close();
+      dataSet.writeToTemporary(blocks[RUR]);
       Assert.fail("Should not have created a replica that was under recovery " +
           blocks[RUR]);
     } catch (BlockAlreadyExistsException e) {
     }
     
-    dataSet.writeToTemporary(blocks[NON_EXISTENT]).close();
+    dataSet.writeToTemporary(blocks[NON_EXISTENT]);
   }
 }