You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/11 22:13:31 UTC
svn commit: r636104 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/
src/java/org/apache/hadoop/io/ src/test/org/apache/hadoop/dfs/
Author: dhruba
Date: Tue Mar 11 14:13:29 2008
New Revision: 636104
URL: http://svn.apache.org/viewvc?rev=636104&view=rev
Log:
HADOOP-2657. A flush call on the DFSOutputStream flushes the last
partial CRC chunk too. (dhruba)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java
hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Mar 11 14:13:29 2008
@@ -175,6 +175,9 @@
HADOOP-2955. Fix TestCrcCorruption test failures caused by HADOOP-2758
(rangadi)
+ HADOOP-2657. A flush call on the DFSOutputStream flushes the last
+ partial CRC chunk too. (dhruba)
+
Release 0.16.1 - 2008-03-13
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Mar 11 14:13:29 2008
@@ -1594,6 +1594,8 @@
private volatile int errorIndex = 0;
private IOException lastException = null;
private long artificialSlowdown = 0;
+ private long lastFlushOffset = -1; // offset when flush was invoked
+ private boolean persistBlocks = false; // persist blocks on namenode
private class Packet {
ByteBuffer buffer;
@@ -1601,6 +1603,8 @@
long offsetInBlock; // offset in block
boolean lastPacketInBlock; // is this the last packet in block?
int numChunks; // number of chunks currently in packet
+ int flushOffsetBuffer; // last full chunk that was flushed
+ long flushOffsetBlock; // block offset of last full chunk flushed
// create a new packet
Packet(int size, long offsetInBlock) {
@@ -1610,9 +1614,23 @@
this.numChunks = 0;
this.offsetInBlock = offsetInBlock;
this.seqno = currentSeqno;
+ this.flushOffsetBuffer = 0;
+ this.flushOffsetBlock = 0;
currentSeqno++;
}
-
+
+ // create a new Packet with the contents copied from the
+ // specified one. Shares the same buffer.
+ Packet(Packet old) {
+ this.buffer = old.buffer;
+ this.lastPacketInBlock = old.lastPacketInBlock;
+ this.numChunks = old.numChunks;
+ this.offsetInBlock = old.offsetInBlock;
+ this.seqno = old.seqno;
+ this.flushOffsetBuffer = old.flushOffsetBuffer;
+ this.flushOffsetBlock = old.flushOffsetBlock;
+ }
+
// writes len bytes from offset off in inarray into
// this packet.
//
@@ -1625,6 +1643,12 @@
void writeInt(int value) {
buffer.putInt(value);
}
+
+ // sets the last flush offset of this packet.
+ void setFlushOffset(int bufoff, long blockOff) {
+ this.flushOffsetBuffer = bufoff;;
+ this.flushOffsetBlock = blockOff;
+ }
}
//
@@ -1674,7 +1698,9 @@
try {
// get packet to be sent.
one = dataQueue.getFirst();
+ int start = 0;
int len = one.buffer.limit();
+ long offsetInBlock = one.offsetInBlock;
// get new block from namenode.
if (blockStream == null) {
@@ -1686,13 +1712,21 @@
response.start();
}
+ // If we are sending a sub-packet, then determine the offset
+ // in block.
+ if (one.flushOffsetBuffer != 0) {
+ offsetInBlock += one.flushOffsetBlock;
+ len = len - one.flushOffsetBuffer;
+ start += one.flushOffsetBuffer;
+ }
+
// user bytes from 'position' to 'limit'.
byte[] arr = one.buffer.array();
- if (one.offsetInBlock >= blockSize) {
+ if (offsetInBlock >= blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
- one.offsetInBlock +
+ offsetInBlock +
" Aborting file " + src);
}
@@ -1706,10 +1740,10 @@
// write out data to remote datanode
blockStream.writeInt(len); // size of this packet
- blockStream.writeLong(one.offsetInBlock); // data offset in block
+ blockStream.writeLong(offsetInBlock); // data offset in block
blockStream.writeLong(one.seqno); // sequence num of packet
blockStream.writeBoolean(one.lastPacketInBlock);
- blockStream.write(arr, 0, len);
+ blockStream.write(arr, start, len);
if (one.lastPacketInBlock) {
blockStream.writeInt(0); // indicate end-of-block
}
@@ -1717,7 +1751,7 @@
LOG.debug("DataStreamer block " + block +
" wrote packet seqno:" + one.seqno +
" size:" + len +
- " offsetInBlock:" + one.offsetInBlock +
+ " offsetInBlock:" + offsetInBlock +
" lastPacketInBlock:" + one.lastPacketInBlock);
} catch (IOException e) {
LOG.warn("DataStreamer Exception: " + e);
@@ -2085,6 +2119,10 @@
LOG.debug("pipeline = " + nodes[i].getName());
}
}
+
+ // persist blocks on namenode on next flush
+ persistBlocks = true;
+
try {
LOG.debug("Connecting to " + nodes[0].getName());
InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
@@ -2182,7 +2220,7 @@
// @see FSOutputSummer#writeChunk()
@Override
- protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
+ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException {
checkOpen();
isClosed();
@@ -2219,6 +2257,8 @@
if (currentPacket == null) {
currentPacket = new Packet(packetSize, bytesCurBlock);
+ LOG.debug("DFSClient writeChunk allocating new packet " +
+ currentPacket.seqno);
}
currentPacket.writeInt(len);
@@ -2246,24 +2286,106 @@
currentPacket = null;
}
}
- //LOG.debug("DFSClient writeChunk with length " + len +
+ //LOG.debug("DFSClient writeChunk done length " + len +
// " checksum length " + cklen);
}
/**
- * Waits till all existing data is flushed and
- * confirmations received from datanodes.
+ * All data is written out to datanodes. It is not guaranteed
+ * that data has been flushed to persistent store on the
+ * datanode. Block allocations are persisted on namenode.
*/
@Override
public synchronized void flush() throws IOException {
+ Packet savePacket = null;
+ int position = 0;
+ long saveOffset = 0;
+
+ try {
+ // Record the state of the current output stream.
+ // This state will be reverted after the flush successfully
+ // finishes. It is necessary to do this so that partial
+ // checksum chunks are reused by writes that follow this
+ // flush.
+ if (currentPacket != null) {
+ savePacket = new Packet(currentPacket);
+ position = savePacket.buffer.position();
+ }
+ saveOffset = bytesCurBlock;
+
+ // flush checksum buffer, but keep checksum buffer intact
+ flushBuffer(true);
+
+ LOG.debug("DFSClient flushInternal save position " +
+ position +
+ " cur position " +
+ ((currentPacket != null) ? currentPacket.buffer.position() : -1) +
+ " limit " +
+ ((currentPacket != null) ? currentPacket.buffer.limit() : -1) +
+ " bytesCurBlock " + bytesCurBlock +
+ " lastFlushOffset " + lastFlushOffset);
+
+ //
+ // Detect the condition that we have already flushed all
+ // outstanding data.
+ //
+ boolean skipFlush = (lastFlushOffset == bytesCurBlock &&
+ savePacket != null && currentPacket != null &&
+ savePacket.seqno == currentPacket.seqno);
+
+ // Do the flush.
+ //
+ if (!skipFlush) {
+
+ // record the valid offset of this flush
+ lastFlushOffset = bytesCurBlock;
+
+ // wait for all packets to be sent and acknowledged
+ flushInternal();
+ }
+
+ // Restore state of stream. Record the last flush offset
+ // of the last full chunk that was flushed.
+ //
+ bytesCurBlock = saveOffset;
+ currentPacket = null;
+ if (savePacket != null) {
+ savePacket.buffer.limit(savePacket.buffer.capacity());
+ savePacket.buffer.position(position);
+ savePacket.setFlushOffset(position,
+ savePacket.numChunks *
+ checksum.getBytesPerChecksum());
+ currentPacket = savePacket;
+ }
+
+ // If any new blocks were allocated since the last flush,
+ // then persist block locations on namenode.
+ //
+ if (persistBlocks) {
+ namenode.fsync(src, clientName);
+ persistBlocks = false;
+ }
+ } catch (IOException e) {
+ lastException = new IOException("IOException flush:" + e);
+ closed = true;
+ closeThreads();
+ throw e;
+ }
+ }
+
+ /**
+ * Waits till all existing data is flushed and confirmations
+ * received from datanodes.
+ */
+ private synchronized void flushInternal() throws IOException {
checkOpen();
isClosed();
-
+
while (!closed) {
synchronized (dataQueue) {
isClosed();
//
- // if there is data in the current buffer, send it across
+ // If there is data in the current buffer, send it across
//
if (currentPacket != null) {
currentPacket.buffer.flip();
@@ -2324,6 +2446,23 @@
s = null;
}
}
+
+ // shutdown datastreamer and responseprocessor threads.
+ private void closeThreads() throws IOException {
+ try {
+ streamer.close();
+ streamer.join();
+
+ // shutdown response after streamer has exited.
+ if (response != null) {
+ response.close();
+ response.join();
+ response = null;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to shutdown response thread");
+ }
+ }
/**
* Closes this output stream and releases any system
@@ -2334,36 +2473,27 @@
isClosed();
try {
- flushBuffer(); // flush from all upper layers
+ flushBuffer(); // flush from all upper layers
- // Mark that this packet is the last packet in block.
- // If there are no outstanding packets and the last packet
- // was not the last one in the current block, then create a
- // packet with empty payload.
- synchronized (dataQueue) {
- if (currentPacket == null && bytesCurBlock != 0) {
- currentPacket = new Packet(packetSize, bytesCurBlock);
- currentPacket.writeInt(0); // one chunk with empty contents
- }
- if (currentPacket != null) {
- currentPacket.lastPacketInBlock = true;
+ // Mark that this packet is the last packet in block.
+ // If there are no outstanding packets and the last packet
+ // was not the last one in the current block, then create a
+ // packet with empty payload.
+ synchronized (dataQueue) {
+ if (currentPacket == null && bytesCurBlock != 0) {
+ currentPacket = new Packet(packetSize, bytesCurBlock);
+ currentPacket.writeInt(0); // one chunk with empty contents
+ }
+ if (currentPacket != null) {
+ currentPacket.lastPacketInBlock = true;
+ currentPacket.setFlushOffset(0, 0); // send whole packet
+ }
}
- }
- flush(); // flush all data to Datanodes
+ flushInternal(); // flush all data to Datanodes
closed = true;
-
- // wait for threads to finish processing
- streamer.close();
- // wait for threads to exit
- streamer.join();
-
- // shutdown response after streamer has exited.
- if (response != null) {
- response.close();
- response.join();
- response = null;
- }
+
+ closeThreads();
synchronized (dataQueue) {
if (blockStream != null) {
@@ -2395,8 +2525,6 @@
}
}
}
- } catch (InterruptedException e) {
- throw new IOException("Failed to shutdown response thread");
} finally {
closed = true;
}
@@ -2406,7 +2534,7 @@
artificialSlowdown = period;
}
- void setChunksPerPacket(int value) {
+ synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
packetSize = chunkSize * chunksPerPacket;
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar 11 14:13:29 2008
@@ -2080,12 +2080,33 @@
// this class is a bufferoutputstream that exposes the number of
// bytes in the buffer.
static private class DFSBufferedOutputStream extends BufferedOutputStream {
+ OutputStream out;
DFSBufferedOutputStream(OutputStream out, int capacity) {
super(out, capacity);
+ this.out = out;
}
- int count() {
- return count;
+ public synchronized void flush() throws IOException {
+ super.flush();
+ }
+
+ /**
+ * Returns true if the channel pointer is already set at the
+ * specified offset. Otherwise returns false.
+ */
+ synchronized boolean samePosition(FSDatasetInterface data,
+ FSDataset.BlockWriteStreams streams,
+ Block block,
+ long offset)
+ throws IOException {
+ if (data.getChannelPosition(block, streams) + count == offset) {
+ return true;
+ }
+ LOG.debug("samePosition is false. " +
+ " current position " + data.getChannelPosition(block, streams)+
+ " buffered size " + count +
+ " new offset " + offset);
+ return false;
}
}
@@ -2111,8 +2132,6 @@
private DataOutputStream mirrorOut;
private Daemon responder = null;
private Throttler throttler;
- private int lastLen = -1;
- private int curLen = -1;
private FSDataset.BlockWriteStreams streams;
private boolean isRecovery = false;
private String clientName;
@@ -2214,15 +2233,6 @@
+ " expected <= " + bytesPerChecksum);
}
- if (lastLen > 0 && lastLen != bytesPerChecksum) {
- throw new IOException("Got wrong length during receiveBlock(" + block
- + ") from " + inAddr + " : " + " got " + lastLen + " instead of "
- + bytesPerChecksum);
- }
-
- lastLen = curLen;
- curLen = len;
-
in.readFully(buf, 0, len);
/*
@@ -2509,9 +2519,8 @@
}
return;
}
- if (data.getChannelPosition(block, streams) + bufStream.count() ==
- offsetInBlock) {
- return; // nothing to do
+ if (bufStream.samePosition(data, streams, block, offsetInBlock)) {
+ return;
}
if (offsetInBlock % bytesPerChecksum != 0) {
throw new IOException("setBlockPosition trying to set position to " +
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Tue Mar 11 14:13:29 2008
@@ -47,7 +47,6 @@
}
public void close() throws IOException {
- flush();
out.close();
}
}
@@ -63,8 +62,7 @@
}
public void close() throws IOException {
- flush();
- out.close();
+ out.close(); // This invokes PositionCache.close()
}
// Returns the underlying output stream. This is used by unit tests.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSOutputSummer.java Tue Mar 11 14:13:29 2008
@@ -97,7 +97,7 @@
// local buffer is empty and user data has one chunk
// checksum and output data
sum.update(b, off, buf.length);
- writeChecksumChunk(b, off, buf.length);
+ writeChecksumChunk(b, off, buf.length, false);
return buf.length;
}
@@ -118,20 +118,34 @@
* the underlying output stream.
*/
protected synchronized void flushBuffer() throws IOException {
- if(count != 0) {
+ flushBuffer(false);
+ }
+
+ /* Forces any buffered output bytes to be checksumed and written out to
+ * the underlying output stream. If keep is true, then the state of
+ * this object remains intact.
+ */
+ protected synchronized void flushBuffer(boolean keep) throws IOException {
+ if (count != 0) {
int chunkLen = count;
count = 0;
- writeChecksumChunk(buf, 0, chunkLen);
+ writeChecksumChunk(buf, 0, chunkLen, keep);
+ if (keep) {
+ count = chunkLen;
+ }
}
}
/* Generate checksum for the data chunk and output data chunk & checksum
- * to the underlying output stream
+ * to the underlying output stream. If keep is true then keep the
+ * current ckecksum intact, do not reset it.
*/
- private void writeChecksumChunk(byte b[], int off, int len)
+ private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
throws IOException {
int tempChecksum = (int)sum.getValue();
- sum.reset();
+ if (!keep) {
+ sum.reset();
+ }
checksum[0] = (byte)((tempChecksum >>> 24) & 0xFF);
checksum[1] = (byte)((tempChecksum >>> 16) & 0xFF);
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Mar 11 14:13:29 2008
@@ -958,11 +958,12 @@
}
if (out != null) {
- out.flush();
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
+ } else {
+ out.flush();
}
out = null;
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java?rev=636104&r1=636103&r2=636104&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileAppend.java Tue Mar 11 14:13:29 2008
@@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileUtil.HardLink;
/**
@@ -39,6 +40,16 @@
static final int numBlocks = 10;
static final int fileSize = numBlocks * blockSize + 1;
boolean simulatedStorage = false;
+ byte[] fileContents = null;
+
+ //
+ // create a buffer that contains the entire test file data.
+ //
+ private void initBuffer(int size) {
+ Random rand = new Random(seed);
+ fileContents = new byte[size];
+ rand.nextBytes(fileContents);
+ }
/*
* creates a file but does not close it
@@ -61,6 +72,68 @@
stm.write(buffer);
}
+ //
+ // verify that the data written to the full blocks are sane
+ //
+ private void checkFile(FileSystem fileSys, Path name, int repl)
+ throws IOException {
+ boolean done = false;
+
+ // wait till all full blocks are confirmed by the datanodes.
+ while (!done) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ done = true;
+ String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+ if (locations.length < numBlocks) {
+ System.out.println("Number of blocks found " + locations.length);
+ done = false;
+ continue;
+ }
+ for (int idx = 0; idx < numBlocks; idx++) {
+ if (locations[idx].length < repl) {
+ System.out.println("Block index " + idx + " not yet replciated.");
+ done = false;
+ break;
+ }
+ }
+ }
+ FSDataInputStream stm = fileSys.open(name);
+ byte[] expected = new byte[numBlocks * blockSize];
+ if (simulatedStorage) {
+ for (int i= 0; i < expected.length; i++) {
+ expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
+ }
+ } else {
+ for (int i= 0; i < expected.length; i++) {
+ expected[i] = fileContents[i];
+ }
+ }
+ // do a sanity check. Read the file
+ byte[] actual = new byte[numBlocks * blockSize];
+ stm.readFully(0, actual);
+ checkData(actual, 0, expected, "Read 1");
+ }
+
+ private void checkFullFile(FileSystem fs, Path name) throws IOException {
+ FSDataInputStream stm = fs.open(name);
+ byte[] actual = new byte[fileSize];
+ stm.readFully(0, actual);
+ checkData(actual, 0, fileContents, "Read 2");
+ stm.close();
+ }
+
+ private void checkData(byte[] actual, int from, byte[] expected, String message) {
+ for (int idx = 0; idx < actual.length; idx++) {
+ assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+ expected[from+idx]+" actual "+actual[idx],
+ expected[from+idx], actual[idx]);
+ actual[idx] = 0;
+ }
+ }
+
+
/**
* Test that copy on write for blocks works correctly
*/
@@ -126,6 +199,104 @@
dataset.detachBlock(b, 1) == false);
}
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test a simple flush on a simple HDFS file.
+ */
+ public void testSimpleFlush() throws IOException {
+ Configuration conf = new Configuration();
+ if (simulatedStorage) {
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ }
+ initBuffer(fileSize);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ try {
+
+ // create a new file.
+ Path file1 = new Path("/simpleFlush.dat");
+ FSDataOutputStream stm = createFile(fs, file1, 1);
+ System.out.println("Created file simpleFlush.dat");
+
+ // write to file
+ int mid = fileSize/2;
+ stm.write(fileContents, 0, mid);
+ stm.flush();
+ System.out.println("Wrote and Flushed first part of file.");
+
+ // write the remainder of the file
+ stm.write(fileContents, mid, fileSize - mid);
+ System.out.println("Written second part of file");
+ stm.flush();
+ stm.flush(); // two consecutive flushes is being tested here.
+ System.out.println("Wrote and Flushed second part of file.");
+
+ // verify that full blocks are sane
+ checkFile(fs, file1, 1);
+
+ stm.close();
+ System.out.println("Closed file.");
+
+ // verify that entire file is good
+ checkFullFile(fs, file1);
+
+ } catch (IOException e) {
+ System.out.println("Exception :" + e);
+ throw e;
+ } catch (Throwable e) {
+ System.out.println("Throwable :" + e);
+ e.printStackTrace();
+ throw new IOException("Throwable : " + e);
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test that file data can be flushed.
+ */
+ public void testComplexFlush() throws IOException {
+ Configuration conf = new Configuration();
+ if (simulatedStorage) {
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ }
+ initBuffer(fileSize);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ try {
+
+ // create a new file.
+ Path file1 = new Path("/complexFlush.dat");
+ FSDataOutputStream stm = createFile(fs, file1, 1);
+ System.out.println("Created file complexFlush.dat");
+
+ int start = 0;
+ for (start = 0; (start + 29) < fileSize; ) {
+ stm.write(fileContents, start, 29);
+ stm.flush();
+ start += 29;
+ }
+ stm.write(fileContents, start, fileSize-start);
+
+ // verify that full blocks are sane
+ checkFile(fs, file1, 1);
+ stm.close();
+
+ // verify that entire file is good
+ checkFullFile(fs, file1);
+ } catch (IOException e) {
+ System.out.println("Exception :" + e);
+ throw e;
+ } catch (Throwable e) {
+ System.out.println("Throwable :" + e);
+ e.printStackTrace();
+ throw new IOException("Throwable : " + e);
} finally {
fs.close();
cluster.shutdown();