You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2014/08/29 01:48:52 UTC
git commit: HDFS-6865. Byte array native checksumming on client side.
Contributed by James Thomas.
Repository: hadoop
Updated Branches:
refs/heads/trunk 48aa3b727 -> ab638e77b
HDFS-6865. Byte array native checksumming on client side. Contributed by James Thomas.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab638e77
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab638e77
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab638e77
Branch: refs/heads/trunk
Commit: ab638e77b811d9592470f7d342cd11a66efbbf0d
Parents: 48aa3b7
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Aug 28 16:44:09 2014 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Aug 28 16:44:09 2014 -0700
----------------------------------------------------------------------
.../apache/hadoop/fs/ChecksumFileSystem.java | 8 +-
.../java/org/apache/hadoop/fs/ChecksumFs.java | 8 +-
.../org/apache/hadoop/fs/FSOutputSummer.java | 107 ++++++++++++-------
.../org/apache/hadoop/util/DataChecksum.java | 2 +
.../org/apache/hadoop/util/NativeCrc32.java | 2 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 38 ++-----
.../org/apache/hadoop/hdfs/TestFileAppend.java | 4 +-
.../security/token/block/TestBlockToken.java | 2 +
.../namenode/TestBlockUnderConstruction.java | 3 +
.../namenode/TestDecommissioningStatus.java | 3 +
11 files changed, 108 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index 511ca7f..c8d1b69 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -381,7 +381,8 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
long blockSize,
Progressable progress)
throws IOException {
- super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4);
+ super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
+ fs.getBytesPerSum()));
int bytesPerSum = fs.getBytesPerSum();
this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
replication, blockSize, progress);
@@ -405,10 +406,11 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
}
@Override
- protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
+ protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
+ int ckoff, int cklen)
throws IOException {
datas.write(b, offset, len);
- sums.write(checksum);
+ sums.write(checksum, ckoff, cklen);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
index 4be3b29..ab5cd13 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
@@ -337,7 +337,8 @@ public abstract class ChecksumFs extends FilterFs {
final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
final boolean createParent) throws IOException {
- super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4);
+ super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
+ fs.getBytesPerSum()));
// checksumOpt is passed down to the raw fs. Unless it implements
// checksum impelemts internally, checksumOpt will be ignored.
@@ -370,10 +371,11 @@ public abstract class ChecksumFs extends FilterFs {
}
@Override
- protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
+ protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
+ int ckoff, int cklen)
throws IOException {
datas.write(b, offset, len);
- sums.write(checksum);
+ sums.write(checksum, ckoff, cklen);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
index 49c919a..19cbb6f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
@@ -18,13 +18,14 @@
package org.apache.hadoop.fs;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.DataChecksum;
+
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Checksum;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
/**
* This is a generic output stream for generating checksums for
* data before it is written to the underlying stream
@@ -33,7 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
abstract public class FSOutputSummer extends OutputStream {
// data checksum
- private Checksum sum;
+ private final DataChecksum sum;
// internal buffer for storing data before it is checksumed
private byte buf[];
// internal buffer for storing checksum
@@ -41,18 +42,24 @@ abstract public class FSOutputSummer extends OutputStream {
// The number of valid bytes in the buffer.
private int count;
- protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
+ // We want this value to be a multiple of 3 because the native code checksums
+ // 3 chunks simultaneously. The chosen value of 9 strikes a balance between
+ // limiting the number of JNI calls and flushing to the underlying stream
+ // relatively frequently.
+ private static final int BUFFER_NUM_CHUNKS = 9;
+
+ protected FSOutputSummer(DataChecksum sum) {
this.sum = sum;
- this.buf = new byte[maxChunkSize];
- this.checksum = new byte[checksumSize];
+ this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
+ this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS];
this.count = 0;
}
/* write the data chunk in <code>b</code> staring at <code>offset</code> with
- * a length of <code>len</code>, and its checksum
+ * a length of <code>len > 0</code>, and its checksum
*/
- protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
- throws IOException;
+ protected abstract void writeChunk(byte[] b, int bOffset, int bLen,
+ byte[] checksum, int checksumOffset, int checksumLen) throws IOException;
/**
* Check if the implementing OutputStream is closed and should no longer
@@ -66,7 +73,6 @@ abstract public class FSOutputSummer extends OutputStream {
/** Write one byte */
@Override
public synchronized void write(int b) throws IOException {
- sum.update(b);
buf[count++] = (byte)b;
if(count == buf.length) {
flushBuffer();
@@ -111,18 +117,17 @@ abstract public class FSOutputSummer extends OutputStream {
*/
private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length) {
- // local buffer is empty and user data has one chunk
- // checksum and output data
+ // local buffer is empty and user buffer size >= local buffer size, so
+ // simply checksum the user buffer and send it directly to the underlying
+ // stream
final int length = buf.length;
- sum.update(b, off, length);
- writeChecksumChunk(b, off, length, false);
+ writeChecksumChunks(b, off, length);
return length;
}
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
- sum.update(b, off, bytesToCopy);
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
@@ -136,22 +141,45 @@ abstract public class FSOutputSummer extends OutputStream {
* the underlying output stream.
*/
protected synchronized void flushBuffer() throws IOException {
- flushBuffer(false);
+ flushBuffer(false, true);
}
- /* Forces any buffered output bytes to be checksumed and written out to
- * the underlying output stream. If keep is true, then the state of
- * this object remains intact.
+ /* Forces buffered output bytes to be checksummed and written out to
+ * the underlying output stream. If there is a trailing partial chunk in the
+ * buffer,
+ * 1) flushPartial tells us whether to flush that chunk
+ * 2) if flushPartial is true, keep tells us whether to keep that chunk in the
+ * buffer (if flushPartial is false, it is always kept in the buffer)
+ *
+ * Returns the number of bytes that were flushed but are still left in the
+ * buffer (can only be non-zero if keep is true).
*/
- protected synchronized void flushBuffer(boolean keep) throws IOException {
- if (count != 0) {
- int chunkLen = count;
+ protected synchronized int flushBuffer(boolean keep,
+ boolean flushPartial) throws IOException {
+ int bufLen = count;
+ int partialLen = bufLen % sum.getBytesPerChecksum();
+ int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
+ if (lenToFlush != 0) {
+ writeChecksumChunks(buf, 0, lenToFlush);
+ if (!flushPartial || keep) {
+ count = partialLen;
+ System.arraycopy(buf, bufLen - count, buf, 0, count);
+ } else {
count = 0;
- writeChecksumChunk(buf, 0, chunkLen, keep);
- if (keep) {
- count = chunkLen;
}
}
+
+ // total bytes left minus unflushed bytes left
+ return count - (bufLen - lenToFlush);
+ }
+
+ /**
+ * Checksums all complete data chunks and flushes them to the underlying
+ * stream. If there is a trailing partial chunk, it is not flushed and is
+ * maintained in the buffer.
+ */
+ public void flush() throws IOException {
+ flushBuffer(false, false);
}
/**
@@ -161,18 +189,18 @@ abstract public class FSOutputSummer extends OutputStream {
return count;
}
- /** Generate checksum for the data chunk and output data chunk & checksum
- * to the underlying output stream. If keep is true then keep the
- * current checksum intact, do not reset it.
+ /** Generate checksums for the given data chunks and output chunks & checksums
+ * to the underlying output stream.
*/
- private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
+ private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
- int tempChecksum = (int)sum.getValue();
- if (!keep) {
- sum.reset();
+ sum.calculateChunkedSums(b, off, len, checksum, 0);
+ for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
+ int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
+ int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize();
+ writeChunk(b, off + i, chunkLen, checksum, ckOffset,
+ sum.getChecksumSize());
}
- int2byte(tempChecksum, checksum);
- writeChunk(b, off, len, checksum);
}
/**
@@ -196,9 +224,14 @@ abstract public class FSOutputSummer extends OutputStream {
/**
* Resets existing buffer with a new one of the specified size.
*/
- protected synchronized void resetChecksumChunk(int size) {
- sum.reset();
+ protected synchronized void setChecksumBufSize(int size) {
this.buf = new byte[size];
+ this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) *
+ sum.getChecksumSize()];
this.count = 0;
}
+
+ protected synchronized void resetChecksumBufSize() {
+ setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index 1636af6..9f0ee35 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -339,6 +339,7 @@ public class DataChecksum implements Checksum {
byte[] data, int dataOff, int dataLen,
byte[] checksums, int checksumsOff, String fileName,
long basePos) throws ChecksumException {
+ if (type.size == 0) return;
if (NativeCrc32.isAvailable()) {
NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id,
@@ -421,6 +422,7 @@ public class DataChecksum implements Checksum {
public void calculateChunkedSums(
byte[] data, int dataOffset, int dataLength,
byte[] sums, int sumsOffset) {
+ if (type.size == 0) return;
if (NativeCrc32.isAvailable()) {
NativeCrc32.calculateChunkedSumsByteArray(bytesPerChecksum, type.id,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java
index 2f21ae1..0807d2c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCrc32.java
@@ -42,7 +42,7 @@ class NativeCrc32 {
* modified.
*
* @param bytesPerSum the chunk size (eg 512 bytes)
- * @param checksumType the DataChecksum type constant
+ * @param checksumType the DataChecksum type constant (NULL is not supported)
* @param sums the DirectByteBuffer pointing at the beginning of the
* stored checksums
* @param data the DirectByteBuffer pointing at the beginning of the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2c56407..8268b6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -434,6 +434,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6773. MiniDFSCluster should skip edit log fsync by default (Stephen
Chu via Colin Patrick McCabe)
+ HDFS-6865. Byte array native checksumming on client side
+ (James Thomas via todd)
+
BUG FIXES
HDFS-6823. dfs.web.authentication.kerberos.principal shows up in logs for
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 14977a2..0b5ecda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -398,7 +398,7 @@ public class DFSOutputStream extends FSOutputSummer
// one chunk that fills up the partial chunk.
//
computePacketChunkSize(0, freeInCksum);
- resetChecksumChunk(freeInCksum);
+ setChecksumBufSize(freeInCksum);
appendChunk = true;
} else {
// if the remaining space in the block is smaller than
@@ -1563,7 +1563,7 @@ public class DFSOutputStream extends FSOutputSummer
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
- super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
+ super(checksum);
this.dfsClient = dfsClient;
this.src = src;
this.fileId = stat.getFileId();
@@ -1717,22 +1717,21 @@ public class DFSOutputStream extends FSOutputSummer
// @see FSOutputSummer#writeChunk()
@Override
- protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
- throws IOException {
+ protected synchronized void writeChunk(byte[] b, int offset, int len,
+ byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();
- int cklen = checksum.length;
int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
- if (checksum.length != this.checksum.getChecksumSize()) {
+ if (cklen != this.checksum.getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
this.checksum.getChecksumSize() +
- " but found to be " + checksum.length);
+ " but found to be " + cklen);
}
if (currentPacket == null) {
@@ -1748,7 +1747,7 @@ public class DFSOutputStream extends FSOutputSummer
}
}
- currentPacket.writeChecksum(checksum, 0, cklen);
+ currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
@@ -1772,7 +1771,7 @@ public class DFSOutputStream extends FSOutputSummer
// crc chunks from now on.
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
appendChunk = false;
- resetChecksumChunk(bytesPerChecksum);
+ resetChecksumBufSize();
}
if (!appendChunk) {
@@ -1853,20 +1852,13 @@ public class DFSOutputStream extends FSOutputSummer
long lastBlockLength = -1L;
boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
synchronized (this) {
- /* Record current blockOffset. This might be changed inside
- * flushBuffer() where a partial checksum chunk might be flushed.
- * After the flush, reset the bytesCurBlock back to its previous value,
- * any partial checksum chunk will be sent now and in next packet.
- */
- long saveOffset = bytesCurBlock;
- Packet oldCurrentPacket = currentPacket;
// flush checksum buffer, but keep checksum buffer intact
- flushBuffer(true);
+ int numKept = flushBuffer(true, true);
// bytesCurBlock potentially incremented if there was buffered data
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug(
- "DFSClient flush() : saveOffset " + saveOffset +
+ "DFSClient flush() :" +
" bytesCurBlock " + bytesCurBlock +
" lastFlushOffset " + lastFlushOffset);
}
@@ -1883,14 +1875,6 @@ public class DFSOutputStream extends FSOutputSummer
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
}
} else {
- // We already flushed up to this offset.
- // This means that we haven't written anything since the last flush
- // (or the beginning of the file). Hence, we should not have any
- // packet queued prior to this call, since the last flush set
- // currentPacket = null.
- assert oldCurrentPacket == null :
- "Empty flush should not occur with a currentPacket";
-
if (isSync && bytesCurBlock > 0) {
// Nothing to send right now,
// and the block was partially written,
@@ -1910,7 +1894,7 @@ public class DFSOutputStream extends FSOutputSummer
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
//
- bytesCurBlock = saveOffset;
+ bytesCurBlock -= numKept;
toWaitFor = lastQueuedSeqno;
} // end synchronized
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index d840077..34c701d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -261,7 +261,9 @@ public class TestFileAppend{
start += 29;
}
stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start);
-
+ // need to make sure we completely write out all full blocks before
+ // the checkFile() call (see FSOutputSummer#flush)
+ stm.flush();
// verify that full blocks are sane
checkFile(fs, file1, 1);
stm.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index 2429345..1fe7ba8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -394,6 +394,8 @@ public class TestBlockToken {
Path filePath = new Path(fileName);
FSDataOutputStream out = fs.create(filePath, (short) 1);
out.write(new byte[1000]);
+ // ensure that the first block is written out (see FSOutputSummer#flush)
+ out.flush();
LocatedBlocks locatedBlocks = cluster.getNameNodeRpc().getBlockLocations(
fileName, 0, 1000);
while (locatedBlocks.getLastLocatedBlock() == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
index 5448e7a..872ff9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
@@ -70,6 +70,9 @@ public class TestBlockUnderConstruction {
long blocksBefore = stm.getPos() / BLOCK_SIZE;
TestFileCreation.writeFile(stm, BLOCK_SIZE);
+ // need to make sure the full block is completely flushed to the DataNodes
+ // (see FSOutputSummer#flush)
+ stm.flush();
int blocksAfter = 0;
// wait until the block is allocated by DataStreamer
BlockLocation[] locatedBlocks;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab638e77/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index d01df75..2ee251b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -141,6 +141,9 @@ public class TestDecommissioningStatus {
Random rand = new Random(seed);
rand.nextBytes(buffer);
stm.write(buffer);
+ // need to make sure that we actually write out both file blocks
+ // (see FSOutputSummer#flush)
+ stm.flush();
// Do not close stream, return it
// so that it is not garbage collected
return stm;