You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/11 22:13:31 UTC

svn commit: r636104 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/test/org/apache/hadoop/dfs/

Author: dhruba
Date: Tue Mar 11 14:13:29 2008
New Revision: 636104

URL: http://svn.apache.org/viewvc?rev=636104&view=rev
Log:
HADOOP-2657. A flush call on the DFSOutputStream flushes the last
partial CRC chunk too.  (dhruba)


Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Mar 11 14:13:29 2008
@@ -175,6 +175,9 @@
     HADOOP-2955. Fix TestCrcCorruption test failures caused by HADOOP-2758
     (rangadi)
 
+    HADOOP-2657. A flush call on the DFSOutputStream flushes the last
+    partial CRC chunk too.  (dhruba)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Mar 11 14:13:29 2008
@@ -1594,6 +1594,8 @@
     private volatile int errorIndex = 0;
     private IOException lastException = null;
     private long artificialSlowdown = 0;
+    private long lastFlushOffset = -1; // offset when flush was invoked
+    private boolean persistBlocks = false; // persist blocks on namenode
 
     private class Packet {
       ByteBuffer buffer;
@@ -1601,6 +1603,8 @@
       long    offsetInBlock;       // offset in block
       boolean lastPacketInBlock;   // is this the last packet in block?
       int     numChunks;           // number of chunks currently in packet
+      int     flushOffsetBuffer;   // last full chunk that was flushed
+      long    flushOffsetBlock;    // block offset of last full chunk flushed
   
       // create a new packet
       Packet(int size, long offsetInBlock) {
@@ -1610,9 +1614,23 @@
         this.numChunks = 0;
         this.offsetInBlock = offsetInBlock;
         this.seqno = currentSeqno;
+        this.flushOffsetBuffer = 0;
+        this.flushOffsetBlock = 0;
         currentSeqno++;
       }
-  
+
+      // create a new Packet with the contents copied from the
+      // specified one. Shares the same buffer.
+      Packet(Packet old) {
+        this.buffer = old.buffer;
+        this.lastPacketInBlock = old.lastPacketInBlock;
+        this.numChunks = old.numChunks;
+        this.offsetInBlock = old.offsetInBlock;
+        this.seqno = old.seqno;
+        this.flushOffsetBuffer = old.flushOffsetBuffer;
+        this.flushOffsetBlock = old.flushOffsetBlock;
+      }
+
       // writes len bytes from offset off in inarray into
       // this packet.
       // 
@@ -1625,6 +1643,12 @@
       void  writeInt(int value) {
        buffer.putInt(value);
       }
+
+      // sets the last flush offset of this packet.
+      void setFlushOffset(int bufoff, long blockOff) {
+        this.flushOffsetBuffer = bufoff;;
+        this.flushOffsetBlock = blockOff;
+      }
     }
   
     //
@@ -1674,7 +1698,9 @@
             try {
               // get packet to be sent.
               one = dataQueue.getFirst();
+              int start = 0;
               int len = one.buffer.limit();
+              long offsetInBlock = one.offsetInBlock;
   
               // get new block from namenode.
               if (blockStream == null) {
@@ -1686,13 +1712,21 @@
                 response.start();
               }
 
+              // If we are sending a sub-packet, then determine the offset 
+              // in block.
+              if (one.flushOffsetBuffer != 0) {
+                offsetInBlock += one.flushOffsetBlock;
+                len = len - one.flushOffsetBuffer;
+                start += one.flushOffsetBuffer;
+              }
+
               // user bytes from 'position' to 'limit'.
               byte[] arr = one.buffer.array();
-              if (one.offsetInBlock >= blockSize) {
+              if (offsetInBlock >= blockSize) {
                 throw new IOException("BlockSize " + blockSize +
                                       " is smaller than data size. " +
                                       " Offset of packet in block " + 
-                                      one.offsetInBlock +
+                                      offsetInBlock +
                                       " Aborting file " + src);
               }
 
@@ -1706,10 +1740,10 @@
   
               // write out data to remote datanode
               blockStream.writeInt(len); // size of this packet
-              blockStream.writeLong(one.offsetInBlock); // data offset in block
+              blockStream.writeLong(offsetInBlock); // data offset in block
               blockStream.writeLong(one.seqno); // sequence num of packet
               blockStream.writeBoolean(one.lastPacketInBlock); 
-              blockStream.write(arr, 0, len);
+              blockStream.write(arr, start, len);
               if (one.lastPacketInBlock) {
                 blockStream.writeInt(0); // indicate end-of-block 
               }
@@ -1717,7 +1751,7 @@
               LOG.debug("DataStreamer block " + block +
                         " wrote packet seqno:" + one.seqno +
                         " size:" + len + 
-                        " offsetInBlock:" + one.offsetInBlock + 
+                        " offsetInBlock:" + offsetInBlock +
                         " lastPacketInBlock:" + one.lastPacketInBlock);
             } catch (IOException e) {
               LOG.warn("DataStreamer Exception: " + e);
@@ -2085,6 +2119,10 @@
           LOG.debug("pipeline = " + nodes[i].getName());
         }
       }
+
+      // persist blocks on namenode on next flush
+      persistBlocks = true;
+
       try {
         LOG.debug("Connecting to " + nodes[0].getName());
         InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
@@ -2182,7 +2220,7 @@
   
     // @see FSOutputSummer#writeChunk()
     @Override
-    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
+    protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
                                                           throws IOException {
       checkOpen();
       isClosed();
@@ -2219,6 +2257,8 @@
   
         if (currentPacket == null) {
           currentPacket = new Packet(packetSize, bytesCurBlock);
+          LOG.debug("DFSClient writeChunk allocating new packet " + 
+                    currentPacket.seqno);
         }
 
         currentPacket.writeInt(len);
@@ -2246,24 +2286,106 @@
           currentPacket = null;
         }
       }
-      //LOG.debug("DFSClient writeChunk with length " + len +
+      //LOG.debug("DFSClient writeChunk done length " + len +
       //          " checksum length " + cklen);
     }
   
     /**
-     * Waits till all existing data is flushed and
-     * confirmations received from datanodes.
+     * All data is written out to datanodes. It is not guaranteed 
+     * that data has been flushed to persistent store on the 
+     * datanode. Block allocations are persisted on namenode.
      */
     @Override
     public synchronized void flush() throws IOException {
+      Packet savePacket = null;
+      int position = 0;
+      long saveOffset = 0;
+
+      try {
+        // Record the state of the current output stream.
+        // This state will be reverted after the flush successfully
+        // finishes. It is necessary to do this so that partial 
+        // checksum chunks are reused by writes that follow this 
+        // flush.
+        if (currentPacket != null) {
+          savePacket = new Packet(currentPacket);
+          position = savePacket.buffer.position();
+        }
+        saveOffset = bytesCurBlock;
+
+        // flush checksum buffer, but keep checksum buffer intact
+        flushBuffer(true);
+
+        LOG.debug("DFSClient flushInternal save position " +  
+                  position +
+                  " cur position " +
+                  ((currentPacket != null) ? currentPacket.buffer.position() : -1) +
+                  " limit " +
+                  ((currentPacket != null) ? currentPacket.buffer.limit() : -1) +
+                   " bytesCurBlock " + bytesCurBlock +
+                   " lastFlushOffset " + lastFlushOffset);
+
+        //
+        // Detect the condition that we have already flushed all
+        // outstanding data.
+        //
+        boolean skipFlush = (lastFlushOffset == bytesCurBlock && 
+                             savePacket != null && currentPacket != null &&
+                             savePacket.seqno == currentPacket.seqno);
+        
+        // Do the flush.
+        //
+        if (!skipFlush) {
+
+          // record the valid offset of this flush
+          lastFlushOffset = bytesCurBlock;
+
+          // wait for all packets to be sent and acknowledged
+          flushInternal();
+        }
+        
+        // Restore state of stream. Record the last flush offset 
+        // of the last full chunk that was flushed.
+        //
+        bytesCurBlock = saveOffset;
+        currentPacket = null;
+        if (savePacket != null) {
+          savePacket.buffer.limit(savePacket.buffer.capacity());
+          savePacket.buffer.position(position);
+          savePacket.setFlushOffset(position, 
+                                    savePacket.numChunks * 
+                                    checksum.getBytesPerChecksum());
+          currentPacket = savePacket;
+        }
+
+        // If any new blocks were allocated since the last flush, 
+        // then persist block locations on namenode. 
+        //
+        if (persistBlocks) {
+          namenode.fsync(src, clientName);
+          persistBlocks = false;
+        }
+      } catch (IOException e) {
+          lastException = new IOException("IOException flush:" + e);
+          closed = true;
+          closeThreads();
+          throw e;
+      }
+    }
+
+    /**
+     * Waits till all existing data is flushed and confirmations 
+     * received from datanodes. 
+     */
+    private synchronized void flushInternal() throws IOException {
       checkOpen();
       isClosed();
-  
+
       while (!closed) {
         synchronized (dataQueue) {
           isClosed();
           //
-          // if there is data in the current buffer, send it across
+          // If there is data in the current buffer, send it across
           //
           if (currentPacket != null) {
             currentPacket.buffer.flip();
@@ -2324,6 +2446,23 @@
         s = null;
       }
     }
+ 
+    // shutdown datastreamer and responseprocessor threads.
+    private void closeThreads() throws IOException {
+      try {
+        streamer.close();
+        streamer.join();
+        
+        // shutdown response after streamer has exited.
+        if (response != null) {
+          response.close();
+          response.join();
+          response = null;
+        }
+      } catch (InterruptedException e) {
+        throw new IOException("Failed to shutdown response thread");
+      }
+    }
     
     /**
      * Closes this output stream and releases any system 
@@ -2334,36 +2473,27 @@
       isClosed();
 
       try {
-        flushBuffer();       // flush from all upper layers
+          flushBuffer();       // flush from all upper layers
       
-        // Mark that this packet is the last packet in block.
-        // If there are no outstanding packets and the last packet
-        // was not the last one in the current block, then create a
-        // packet with empty payload.
-        synchronized (dataQueue) {
-          if (currentPacket == null && bytesCurBlock != 0) {
-            currentPacket = new Packet(packetSize, bytesCurBlock);
-            currentPacket.writeInt(0); // one chunk with empty contents
-          }
-          if (currentPacket != null) { 
-            currentPacket.lastPacketInBlock = true;
+          // Mark that this packet is the last packet in block.
+          // If there are no outstanding packets and the last packet
+          // was not the last one in the current block, then create a
+          // packet with empty payload.
+          synchronized (dataQueue) {
+            if (currentPacket == null && bytesCurBlock != 0) {
+              currentPacket = new Packet(packetSize, bytesCurBlock);
+              currentPacket.writeInt(0); // one chunk with empty contents
+            }
+            if (currentPacket != null) { 
+              currentPacket.lastPacketInBlock = true;
+              currentPacket.setFlushOffset(0, 0); // send whole packet
+            }
           }
-        }
 
-        flush();             // flush all data to Datanodes
+        flushInternal();             // flush all data to Datanodes
         closed = true;
- 
-        // wait for threads to finish processing
-        streamer.close();
-        // wait for threads to exit
-        streamer.join();
-        
-        // shutdown response after streamer has exited.
-        if (response != null) {
-          response.close();
-          response.join();
-          response = null;
-        }
+
+        closeThreads();
         
         synchronized (dataQueue) {
           if (blockStream != null) {
@@ -2395,8 +2525,6 @@
             }
           }
         }
-      } catch (InterruptedException e) {
-        throw new IOException("Failed to shutdown response thread");
       } finally {
         closed = true;
       }
@@ -2406,7 +2534,7 @@
       artificialSlowdown = period;
     }
 
-    void setChunksPerPacket(int value) {
+    synchronized void setChunksPerPacket(int value) {
       chunksPerPacket = Math.min(chunksPerPacket, value);
       packetSize = chunkSize * chunksPerPacket;
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar 11 14:13:29 2008
@@ -2080,12 +2080,33 @@
   // this class is a bufferoutputstream that exposes the number of
   // bytes in the buffer.
   static private class DFSBufferedOutputStream extends BufferedOutputStream {
+    OutputStream out;
     DFSBufferedOutputStream(OutputStream out, int capacity) {
       super(out, capacity);
+      this.out = out;
     }
 
-    int count() {
-      return count;
+    public synchronized void flush() throws IOException {
+      super.flush();
+    }
+
+    /**
+     * Returns true if the channel pointer is already set at the
+     * specified offset. Otherwise returns false.
+     */
+    synchronized boolean samePosition(FSDatasetInterface data, 
+                                      FSDataset.BlockWriteStreams streams,
+                                      Block block,
+                                      long offset) 
+                                      throws IOException {
+      if (data.getChannelPosition(block, streams) + count == offset) {
+        return true;
+      }
+      LOG.debug("samePosition is false. " +
+                " current position " + data.getChannelPosition(block, streams)+
+                " buffered size " + count +
+                " new offset " + offset);
+      return false;
     }
   }
 
@@ -2111,8 +2132,6 @@
     private DataOutputStream mirrorOut;
     private Daemon responder = null;
     private Throttler throttler;
-    private int lastLen = -1;
-    private int curLen = -1;
     private FSDataset.BlockWriteStreams streams;
     private boolean isRecovery = false;
     private String clientName;
@@ -2214,15 +2233,6 @@
             + " expected <= " + bytesPerChecksum);
       }
 
-      if (lastLen > 0 && lastLen != bytesPerChecksum) {
-        throw new IOException("Got wrong length during receiveBlock(" + block
-          + ") from " + inAddr + " : " + " got " + lastLen + " instead of "
-          + bytesPerChecksum);
-      }
-
-      lastLen = curLen;
-      curLen = len;
-
       in.readFully(buf, 0, len);
 
       /*
@@ -2509,9 +2519,8 @@
         }
         return;
       }
-      if (data.getChannelPosition(block, streams) + bufStream.count() == 
-                                                    offsetInBlock) {
-        return;                   // nothing to do 
+      if (bufStream.samePosition(data, streams, block, offsetInBlock)) {
+        return;
       }
       if (offsetInBlock % bytesPerChecksum != 0) {
         throw new IOException("setBlockPosition trying to set position to " +

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Tue Mar 11 14:13:29 2008
@@ -47,7 +47,6 @@
     }
     
     public void close() throws IOException {
-      flush();
       out.close();
     }
   }
@@ -63,8 +62,7 @@
   }
 
   public void close() throws IOException {
-    flush();
-    out.close();
+    out.close();         // This invokes PositionCache.close()
   }
 
   // Returns the underlying output stream. This is used by unit tests.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java Tue Mar 11 14:13:29 2008
@@ -97,7 +97,7 @@
       // local buffer is empty and user data has one chunk
       // checksum and output data
       sum.update(b, off, buf.length);
-      writeChecksumChunk(b, off, buf.length);
+      writeChecksumChunk(b, off, buf.length, false);
       return buf.length;
     }
     
@@ -118,20 +118,34 @@
    * the underlying output stream. 
    */
   protected synchronized void flushBuffer() throws IOException {
-    if(count != 0) {
+    flushBuffer(false);
+  }
+
+  /* Forces any buffered output bytes to be checksumed and written out to
+   * the underlying output stream.  If keep is true, then the state of 
+   * this object remains intact.
+   */
+  protected synchronized void flushBuffer(boolean keep) throws IOException {
+    if (count != 0) {
       int chunkLen = count;
       count = 0;
-      writeChecksumChunk(buf, 0, chunkLen);
+      writeChecksumChunk(buf, 0, chunkLen, keep);
+      if (keep) {
+        count = chunkLen;
+      }
     }
   }
   
   /* Generate checksum for the data chunk and output data chunk & checksum
-   * to the underlying output stream
+   * to the underlying output stream. If keep is true then keep the
+   * current ckecksum intact, do not reset it.
    */
-  private void writeChecksumChunk(byte b[], int off, int len)
+  private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
   throws IOException {
     int tempChecksum = (int)sum.getValue();
-    sum.reset();
+    if (!keep) {
+      sum.reset();
+    }
     
     checksum[0] = (byte)((tempChecksum >>> 24) & 0xFF);
     checksum[1] = (byte)((tempChecksum >>> 16) & 0xFF);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Mar 11 14:13:29 2008
@@ -958,11 +958,12 @@
       }
 
       if (out != null) {
-        out.flush();
         
         // Close the underlying stream iff we own it...
         if (ownOutputStream) {
           out.close();
+        } else {
+          out.flush();
         }
         out = null;
       }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java Tue Mar 11 14:13:29 2008
@@ -27,6 +27,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileUtil.HardLink;
 
 /**
@@ -39,6 +40,16 @@
   static final int numBlocks = 10;
   static final int fileSize = numBlocks * blockSize + 1;
   boolean simulatedStorage = false;
+  byte[] fileContents = null;
+
+  //
+  // create a buffer that contains the entire test file data.
+  //
+  private void initBuffer(int size) {
+    Random rand = new Random(seed);
+    fileContents = new byte[size];
+    rand.nextBytes(fileContents);
+  }
 
   /*
    * creates a file but does not close it
@@ -61,6 +72,68 @@
     stm.write(buffer);
   }
 
+  //
+  // verify that the data written to the full blocks are sane
+  // 
+  private void checkFile(FileSystem fileSys, Path name, int repl)
+    throws IOException {
+    boolean done = false;
+
+    // wait till all full blocks are confirmed by the datanodes.
+    while (!done) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {}
+      done = true;
+      String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+      if (locations.length < numBlocks) {
+        System.out.println("Number of blocks found " + locations.length);
+        done = false;
+        continue;
+      }
+      for (int idx = 0; idx < numBlocks; idx++) {
+        if (locations[idx].length < repl) {
+          System.out.println("Block index " + idx + " not yet replciated.");
+          done = false;
+          break;
+        }
+      }
+    }
+    FSDataInputStream stm = fileSys.open(name);
+    byte[] expected = new byte[numBlocks * blockSize];
+    if (simulatedStorage) {
+      for (int i= 0; i < expected.length; i++) {  
+        expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
+      }
+    } else {
+      for (int i= 0; i < expected.length; i++) {  
+        expected[i] = fileContents[i];
+      }
+    }
+    // do a sanity check. Read the file
+    byte[] actual = new byte[numBlocks * blockSize];
+    stm.readFully(0, actual);
+    checkData(actual, 0, expected, "Read 1");
+  }
+
+  private void checkFullFile(FileSystem fs, Path name) throws IOException {
+    FSDataInputStream stm = fs.open(name);
+    byte[] actual = new byte[fileSize];
+    stm.readFully(0, actual);
+    checkData(actual, 0, fileContents, "Read 2");
+    stm.close();
+  }
+
+  private void checkData(byte[] actual, int from, byte[] expected, String message) {
+    for (int idx = 0; idx < actual.length; idx++) {
+      assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+                   expected[from+idx]+" actual "+actual[idx],
+                   expected[from+idx], actual[idx]);
+      actual[idx] = 0;
+    }
+  }
+
+
   /**
    * Test that copy on write for blocks works correctly
    */
@@ -126,6 +199,104 @@
                    dataset.detachBlock(b, 1) == false);
       }
 
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test a simple flush on a simple HDFS file.
+   */
+  public void testSimpleFlush() throws IOException {
+    Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    initBuffer(fileSize);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+
+      // create a new file.
+      Path file1 = new Path("/simpleFlush.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
+      System.out.println("Created file simpleFlush.dat");
+
+      // write to file
+      int mid = fileSize/2;
+      stm.write(fileContents, 0, mid);
+      stm.flush();
+      System.out.println("Wrote and Flushed first part of file.");
+
+      // write the remainder of the file
+      stm.write(fileContents, mid, fileSize - mid);
+      System.out.println("Written second part of file");
+      stm.flush();
+      stm.flush(); // two consecutive flushes is being tested here.
+      System.out.println("Wrote and Flushed second part of file.");
+
+      // verify that full blocks are sane
+      checkFile(fs, file1, 1);
+
+      stm.close();
+      System.out.println("Closed file.");
+
+      // verify that entire file is good
+      checkFullFile(fs, file1);
+
+    } catch (IOException e) {
+      System.out.println("Exception :" + e);
+      throw e; 
+    } catch (Throwable e) {
+      System.out.println("Throwable :" + e);
+      e.printStackTrace();
+      throw new IOException("Throwable : " + e);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test that file data can be flushed.
+   */
+  public void testComplexFlush() throws IOException {
+    Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    initBuffer(fileSize);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    try {
+
+      // create a new file.
+      Path file1 = new Path("/complexFlush.dat");
+      FSDataOutputStream stm = createFile(fs, file1, 1);
+      System.out.println("Created file complexFlush.dat");
+
+      int start = 0;
+      for (start = 0; (start + 29) < fileSize; ) {
+        stm.write(fileContents, start, 29);
+        stm.flush();
+        start += 29;
+      }
+      stm.write(fileContents, start, fileSize-start);
+
+      // verify that full blocks are sane
+      checkFile(fs, file1, 1);
+      stm.close();
+
+      // verify that entire file is good
+      checkFullFile(fs, file1);
+    } catch (IOException e) {
+      System.out.println("Exception :" + e);
+      throw e; 
+    } catch (Throwable e) {
+      System.out.println("Throwable :" + e);
+      e.printStackTrace();
+      throw new IOException("Throwable : " + e);
     } finally {
       fs.close();
       cluster.shutdown();